format with Black

This commit is contained in:
Gilles Boccon-Gibod
2022-12-10 08:53:51 -08:00
parent 297246fa4c
commit 135df0dcc0
104 changed files with 8646 additions and 5766 deletions

View File

@@ -43,56 +43,90 @@ from bumble.gatt import (
GATT_PROTOCOL_MODE_CHARACTERISTIC,
GATT_HID_INFORMATION_CHARACTERISTIC,
GATT_HID_CONTROL_POINT_CHARACTERISTIC,
GATT_REPORT_REFERENCE_DESCRIPTOR
GATT_REPORT_REFERENCE_DESCRIPTOR,
)
# -----------------------------------------------------------------------------
# Protocol Modes
HID_BOOT_PROTOCOL = 0x00
HID_BOOT_PROTOCOL = 0x00
HID_REPORT_PROTOCOL = 0x01
# Report Types
HID_INPUT_REPORT = 0x01
HID_OUTPUT_REPORT = 0x02
HID_INPUT_REPORT = 0x01
HID_OUTPUT_REPORT = 0x02
HID_FEATURE_REPORT = 0x03
# Report Map
HID_KEYBOARD_REPORT_MAP = bytes([
0x05, 0x01, # Usage Page (Generic Desktop Ctrls)
0x09, 0x06, # Usage (Keyboard)
0xA1, 0x01, # Collection (Application)
0x85, 0x01, # . Report ID (1)
0x05, 0x07, # . Usage Page (Kbrd/Keypad)
0x19, 0xE0, # . Usage Minimum (0xE0)
0x29, 0xE7, # . Usage Maximum (0xE7)
0x15, 0x00, # . Logical Minimum (0)
0x25, 0x01, # . Logical Maximum (1)
0x75, 0x01, # . Report Size (1)
0x95, 0x08, # . Report Count (8)
0x81, 0x02, # . Input (Data,Var,Abs,No Wrap,Linear,Preferred State,No Null Position)
0x95, 0x01, # . Report Count (1)
0x75, 0x08, # . Report Size (8)
0x81, 0x01, # . Input (Const,Array,Abs,No Wrap,Linear,Preferred State,No Null Position)
0x95, 0x06, # . Report Count (6)
0x75, 0x08, # . Report Size (8)
0x15, 0x00, # . Logical Minimum (0x00)
0x25, 0x94, # . Logical Maximum (0x94)
0x05, 0x07, # . Usage Page (Kbrd/Keypad)
0x19, 0x00, # . Usage Minimum (0x00)
0x29, 0x94, # . Usage Maximum (0x94)
0x81, 0x00, # . Input (Data,Array,Abs,No Wrap,Linear,Preferred State,No Null Position)
0x95, 0x05, # . Report Count (5)
0x75, 0x01, # . Report Size (1)
0x05, 0x08, # . Usage Page (LEDs)
0x19, 0x01, # . Usage Minimum (Num Lock)
0x29, 0x05, # . Usage Maximum (Kana)
0x91, 0x02, # . Output (Data,Var,Abs,No Wrap,Linear,Preferred State,No Null Position,Non-volatile)
0x95, 0x01, # . Report Count (1)
0x75, 0x03, # . Report Size (3)
0x91, 0x01, # . Output (Const,Array,Abs,No Wrap,Linear,Preferred State,No Null Position,Non-volatile)
0xC0 # End Collection
])
HID_KEYBOARD_REPORT_MAP = bytes(
[
0x05,
0x01, # Usage Page (Generic Desktop Ctrls)
0x09,
0x06, # Usage (Keyboard)
0xA1,
0x01, # Collection (Application)
0x85,
0x01, # . Report ID (1)
0x05,
0x07, # . Usage Page (Kbrd/Keypad)
0x19,
0xE0, # . Usage Minimum (0xE0)
0x29,
0xE7, # . Usage Maximum (0xE7)
0x15,
0x00, # . Logical Minimum (0)
0x25,
0x01, # . Logical Maximum (1)
0x75,
0x01, # . Report Size (1)
0x95,
0x08, # . Report Count (8)
0x81,
0x02, # . Input (Data,Var,Abs,No Wrap,Linear,Preferred State,No Null Position)
0x95,
0x01, # . Report Count (1)
0x75,
0x08, # . Report Size (8)
0x81,
0x01, # . Input (Const,Array,Abs,No Wrap,Linear,Preferred State,No Null Position)
0x95,
0x06, # . Report Count (6)
0x75,
0x08, # . Report Size (8)
0x15,
0x00, # . Logical Minimum (0x00)
0x25,
0x94, # . Logical Maximum (0x94)
0x05,
0x07, # . Usage Page (Kbrd/Keypad)
0x19,
0x00, # . Usage Minimum (0x00)
0x29,
0x94, # . Usage Maximum (0x94)
0x81,
0x00, # . Input (Data,Array,Abs,No Wrap,Linear,Preferred State,No Null Position)
0x95,
0x05, # . Report Count (5)
0x75,
0x01, # . Report Size (1)
0x05,
0x08, # . Usage Page (LEDs)
0x19,
0x01, # . Usage Minimum (Num Lock)
0x29,
0x05, # . Usage Maximum (Kana)
0x91,
0x02, # . Output (Data,Var,Abs,No Wrap,Linear,Preferred State,No Null Position,Non-volatile)
0x95,
0x01, # . Report Count (1)
0x75,
0x03, # . Report Size (3)
0x91,
0x01, # . Output (Const,Array,Abs,No Wrap,Linear,Preferred State,No Null Position,Non-volatile)
0xC0, # End Collection
]
)
# -----------------------------------------------------------------------------
@@ -133,31 +167,41 @@ async def keyboard_host(device, peer_address):
return
await peer.discover_characteristics()
protocol_mode_characteristics = peer.get_characteristics_by_uuid(GATT_PROTOCOL_MODE_CHARACTERISTIC)
protocol_mode_characteristics = peer.get_characteristics_by_uuid(
GATT_PROTOCOL_MODE_CHARACTERISTIC
)
if not protocol_mode_characteristics:
print(color('!!! No Protocol Mode characteristic', 'red'))
return
protocol_mode_characteristic = protocol_mode_characteristics[0]
hid_information_characteristics = peer.get_characteristics_by_uuid(GATT_HID_INFORMATION_CHARACTERISTIC)
hid_information_characteristics = peer.get_characteristics_by_uuid(
GATT_HID_INFORMATION_CHARACTERISTIC
)
if not hid_information_characteristics:
print(color('!!! No HID Information characteristic', 'red'))
return
hid_information_characteristic = hid_information_characteristics[0]
report_map_characteristics = peer.get_characteristics_by_uuid(GATT_REPORT_MAP_CHARACTERISTIC)
report_map_characteristics = peer.get_characteristics_by_uuid(
GATT_REPORT_MAP_CHARACTERISTIC
)
if not report_map_characteristics:
print(color('!!! No Report Map characteristic', 'red'))
return
report_map_characteristic = report_map_characteristics[0]
control_point_characteristics = peer.get_characteristics_by_uuid(GATT_HID_CONTROL_POINT_CHARACTERISTIC)
control_point_characteristics = peer.get_characteristics_by_uuid(
GATT_HID_CONTROL_POINT_CHARACTERISTIC
)
if not control_point_characteristics:
print(color('!!! No Control Point characteristic', 'red'))
return
# control_point_characteristic = control_point_characteristics[0]
report_characteristics = peer.get_characteristics_by_uuid(GATT_REPORT_CHARACTERISTIC)
report_characteristics = peer.get_characteristics_by_uuid(
GATT_REPORT_CHARACTERISTIC
)
if not report_characteristics:
print(color('!!! No Report characteristic', 'red'))
return
@@ -165,13 +209,20 @@ async def keyboard_host(device, peer_address):
print(color('REPORT:', 'yellow'), characteristic)
if characteristic.properties & Characteristic.NOTIFY:
await peer.discover_descriptors(characteristic)
report_reference_descriptor = characteristic.get_descriptor(GATT_REPORT_REFERENCE_DESCRIPTOR)
report_reference_descriptor = characteristic.get_descriptor(
GATT_REPORT_REFERENCE_DESCRIPTOR
)
if report_reference_descriptor:
report_reference = await peer.read_value(report_reference_descriptor)
print(color(' Report Reference:', 'blue'), report_reference.hex())
else:
report_reference = bytes([0, 0])
await peer.subscribe(characteristic, lambda value, param=f'[{i}] {report_reference.hex()}': on_report(param, value))
await peer.subscribe(
characteristic,
lambda value, param=f'[{i}] {report_reference.hex()}': on_report(
param, value
),
)
protocol_mode = await peer.read_value(protocol_mode_characteristic)
print(f'Protocol Mode: {protocol_mode.hex()}')
@@ -192,77 +243,91 @@ async def keyboard_device(device, command):
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([0, 0, 0, 0, 0, 0, 0, 0]),
[
Descriptor(GATT_REPORT_REFERENCE_DESCRIPTOR, Descriptor.READABLE, bytes([0x01, HID_INPUT_REPORT]))
]
Descriptor(
GATT_REPORT_REFERENCE_DESCRIPTOR,
Descriptor.READABLE,
bytes([0x01, HID_INPUT_REPORT]),
)
],
)
# Create an 'output report' characteristic to receive keyboard reports from the host
output_report_characteristic = Characteristic(
GATT_REPORT_CHARACTERISTIC,
Characteristic.READ | Characteristic.WRITE | Characteristic.WRITE_WITHOUT_RESPONSE,
Characteristic.READ
| Characteristic.WRITE
| Characteristic.WRITE_WITHOUT_RESPONSE,
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([0]),
[
Descriptor(GATT_REPORT_REFERENCE_DESCRIPTOR, Descriptor.READABLE, bytes([0x01, HID_OUTPUT_REPORT]))
]
Descriptor(
GATT_REPORT_REFERENCE_DESCRIPTOR,
Descriptor.READABLE,
bytes([0x01, HID_OUTPUT_REPORT]),
)
],
)
# Add the services to the GATT sever
device.add_services([
Service(
GATT_DEVICE_INFORMATION_SERVICE,
[
Characteristic(
GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
Characteristic.READ,
Characteristic.READABLE,
'Bumble'
)
]
),
Service(
GATT_HUMAN_INTERFACE_DEVICE_SERVICE,
[
Characteristic(
GATT_PROTOCOL_MODE_CHARACTERISTIC,
Characteristic.READ,
Characteristic.READABLE,
bytes([HID_REPORT_PROTOCOL])
),
Characteristic(
GATT_HID_INFORMATION_CHARACTERISTIC,
Characteristic.READ,
Characteristic.READABLE,
bytes([0x11, 0x01, 0x00, 0x03]) # bcdHID=1.1, bCountryCode=0x00, Flags=RemoteWake|NormallyConnectable
),
Characteristic(
GATT_HID_CONTROL_POINT_CHARACTERISTIC,
Characteristic.WRITE_WITHOUT_RESPONSE,
Characteristic.WRITEABLE,
CharacteristicValue(write=on_hid_control_point_write)
),
Characteristic(
GATT_REPORT_MAP_CHARACTERISTIC,
Characteristic.READ,
Characteristic.READABLE,
HID_KEYBOARD_REPORT_MAP
),
input_report_characteristic,
output_report_characteristic
]
),
Service(
GATT_BATTERY_SERVICE,
[
Characteristic(
GATT_BATTERY_LEVEL_CHARACTERISTIC,
Characteristic.READ,
Characteristic.READABLE,
bytes([100])
)
]
)
])
device.add_services(
[
Service(
GATT_DEVICE_INFORMATION_SERVICE,
[
Characteristic(
GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
Characteristic.READ,
Characteristic.READABLE,
'Bumble',
)
],
),
Service(
GATT_HUMAN_INTERFACE_DEVICE_SERVICE,
[
Characteristic(
GATT_PROTOCOL_MODE_CHARACTERISTIC,
Characteristic.READ,
Characteristic.READABLE,
bytes([HID_REPORT_PROTOCOL]),
),
Characteristic(
GATT_HID_INFORMATION_CHARACTERISTIC,
Characteristic.READ,
Characteristic.READABLE,
bytes(
[0x11, 0x01, 0x00, 0x03]
), # bcdHID=1.1, bCountryCode=0x00, Flags=RemoteWake|NormallyConnectable
),
Characteristic(
GATT_HID_CONTROL_POINT_CHARACTERISTIC,
Characteristic.WRITE_WITHOUT_RESPONSE,
Characteristic.WRITEABLE,
CharacteristicValue(write=on_hid_control_point_write),
),
Characteristic(
GATT_REPORT_MAP_CHARACTERISTIC,
Characteristic.READ,
Characteristic.READABLE,
HID_KEYBOARD_REPORT_MAP,
),
input_report_characteristic,
output_report_characteristic,
],
),
Service(
GATT_BATTERY_SERVICE,
[
Characteristic(
GATT_BATTERY_LEVEL_CHARACTERISTIC,
Characteristic.READ,
Characteristic.READABLE,
bytes([100]),
)
],
),
]
)
# Debug print
for attribute in device.gatt_server.attributes:
@@ -270,13 +335,20 @@ async def keyboard_device(device, command):
# Set the advertising data
device.advertising_data = bytes(
AdvertisingData([
(AdvertisingData.COMPLETE_LOCAL_NAME, bytes('Bumble Keyboard', 'utf-8')),
(AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
bytes(GATT_HUMAN_INTERFACE_DEVICE_SERVICE)),
(AdvertisingData.APPEARANCE, struct.pack('<H', 0x03C1)),
(AdvertisingData.FLAGS, bytes([0x05]))
])
AdvertisingData(
[
(
AdvertisingData.COMPLETE_LOCAL_NAME,
bytes('Bumble Keyboard', 'utf-8'),
),
(
AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
bytes(GATT_HUMAN_INTERFACE_DEVICE_SERVICE),
),
(AdvertisingData.APPEARANCE, struct.pack('<H', 0x03C1)),
(AdvertisingData.FLAGS, bytes([0x05])),
]
)
)
# Attach a listener
@@ -303,14 +375,21 @@ async def keyboard_device(device, command):
code = ord(key)
if code >= ord('a') and code <= ord('z'):
hid_code = 0x04 + code - ord('a')
input_report_characteristic.value = bytes([0, 0, hid_code, 0, 0, 0, 0, 0])
await device.notify_subscribers(input_report_characteristic)
input_report_characteristic.value = bytes(
[0, 0, hid_code, 0, 0, 0, 0, 0]
)
await device.notify_subscribers(
input_report_characteristic
)
elif message_type == 'keyup':
input_report_characteristic.value = bytes.fromhex('0000000000000000')
input_report_characteristic.value = bytes.fromhex(
'0000000000000000'
)
await device.notify_subscribers(input_report_characteristic)
except websockets.exceptions.ConnectionClosedOK:
pass
await websockets.serve(serve, 'localhost', 8989)
await asyncio.get_event_loop().create_future()
else:
@@ -321,7 +400,9 @@ async def keyboard_device(device, command):
# Keypress for the letter
keycode = 0x04 + letter - 0x61
input_report_characteristic.value = bytes([0, 0, keycode, 0, 0, 0, 0, 0])
input_report_characteristic.value = bytes(
[0, 0, keycode, 0, 0, 0, 0, 0]
)
await device.notify_subscribers(input_report_characteristic)
# Key release
@@ -335,10 +416,16 @@ async def main():
print('Usage: python keyboard.py <device-config> <transport-spec> <command>')
print(' where <command> is one of:')
print(' connect <address> (run a keyboard host, connecting to a keyboard)')
print(' web (run a keyboard with keypress input from a web page, see keyboard.html')
print(' sim (run a keyboard simulation, emitting a canned sequence of keystrokes')
print(
' web (run a keyboard with keypress input from a web page, see keyboard.html'
)
print(
' sim (run a keyboard simulation, emitting a canned sequence of keystrokes'
)
print('example: python keyboard.py keyboard.json usb:0 sim')
print('example: python keyboard.py keyboard.json usb:0 connect A0:A1:A2:A3:A4:A5')
print(
'example: python keyboard.py keyboard.json usb:0 connect A0:A1:A2:A3:A4:A5'
)
return
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
@@ -355,5 +442,5 @@ async def main():
# -----------------------------------------------------------------------------
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())