Menu and name change review comments fix

This commit is contained in:
skarnataki
2023-11-23 15:43:18 +00:00
parent 6ab41c466f
commit 0578e84586
3 changed files with 53 additions and 42 deletions

View File

@@ -345,7 +345,7 @@ class HID(EventEmitter):
def send_pdu_on_intr(self, msg: bytes) -> None: def send_pdu_on_intr(self, msg: bytes) -> None:
self.l2cap_intr_channel.send_pdu(msg) # type: ignore self.l2cap_intr_channel.send_pdu(msg) # type: ignore
def send_data(self, data: bytes) -> None: def send_report_on_interrupt(self, data: bytes) -> None:
if self.role == HID.Role.HOST: if self.role == HID.Role.HOST:
report_type = Message.ReportType.OUTPUT_REPORT report_type = Message.ReportType.OUTPUT_REPORT
else: else:
@@ -375,12 +375,12 @@ class Device(HID):
ERR_UNKNOWN = 0x03 ERR_UNKNOWN = 0x03
SUCCESS = 0xff SUCCESS = 0xff
class GetSetStatus(): class GetSetStatus():
def __init__(self) -> None: def __init__(self) -> None:
self.status = 0 self.status = 0
self.data=None self.data=None
def __init__(self, device: Device) -> None: def __init__(self, device: Device) -> None:
super().__init__(device, HID.Role.DEVICE) super().__init__(device, HID.Role.DEVICE)
self.get_report_cb = None self.get_report_cb = None
@@ -399,7 +399,7 @@ class Device(HID):
hid_message = bytes(msg) hid_message = bytes(msg)
logger.debug(f'>>> HID CONTROL DATA: {hid_message.hex()}') logger.debug(f'>>> HID CONTROL DATA: {hid_message.hex()}')
self.send_pdu_on_ctrl(hid_message) self.send_pdu_on_ctrl(hid_message)
def handle_get_report(self, pdu: bytes): def handle_get_report(self, pdu: bytes):
ret = self.GetSetStatus() ret = self.GetSetStatus()
report_type=pdu[0] & 0x03 report_type=pdu[0] & 0x03
@@ -413,7 +413,7 @@ class Device(HID):
if(self.get_report_cb != None): if(self.get_report_cb != None):
ret = self.get_report_cb(report_id, report_type, buffer_size) ret = self.get_report_cb(report_id, report_type, buffer_size)
if(ret.status == self.GetSetReturn.FAILURE): if(ret.status == self.GetSetReturn.FAILURE):
self.send_handshake_message(Message.Handshake.ERR_UNKNOWN) self.send_handshake_message(Message.Handshake.ERR_UNKNOWN)
elif(ret.status == self.GetSetReturn.SUCCESS): elif(ret.status == self.GetSetReturn.SUCCESS):
@@ -424,19 +424,19 @@ class Device(HID):
self.send_control_data(report_type=report_type, data = data) self.send_control_data(report_type=report_type, data = data)
else: else:
self.send_handshake_message(Message.Handshake.ERR_INVALID_PARAMETER) self.send_handshake_message(Message.Handshake.ERR_INVALID_PARAMETER)
elif(ret.status == self.GetSetReturn.REPORT_ID_NOT_FOUND): elif(ret.status == self.GetSetReturn.REPORT_ID_NOT_FOUND):
self.send_handshake_message(Message.Handshake.ERR_INVALID_REPORT_ID) self.send_handshake_message(Message.Handshake.ERR_INVALID_REPORT_ID)
elif(ret.status == self.GetSetReturn.ERR_UNSUPPORTED_REQUEST): elif(ret.status == self.GetSetReturn.ERR_UNSUPPORTED_REQUEST):
self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST) self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST)
else: else:
logger.debug("GetReport callback not registered !!") logger.debug("GetReport callback not registered !!")
self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST) self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST)
def register_get_report_cb(self,cb): def register_get_report_cb(self,cb):
self.get_report_cb=cb self.get_report_cb=cb
logger.debug("GetReport callback registered successfully") logger.debug("GetReport callback registered successfully")
def handle_set_report(self, pdu: bytes): def handle_set_report(self, pdu: bytes):
if(self.set_report_cb != None): if(self.set_report_cb != None):
report_type=pdu[0] & 0x03 report_type=pdu[0] & 0x03
@@ -448,13 +448,13 @@ class Device(HID):
return return
else: else:
logger.debug("SetReport callback not registered !!") logger.debug("SetReport callback not registered !!")
self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST) self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST)
def register_set_report_cb(self, cb): def register_set_report_cb(self, cb):
self.set_report_cb=cb self.set_report_cb=cb
logger.debug("SetReport callback registered successfully") logger.debug("SetReport callback registered successfully")
def handle_get_protocol(self, pdu: bytes): def handle_get_protocol(self, pdu: bytes):
ret = self.GetSetStatus() ret = self.GetSetStatus()
if(self.get_protocol_cb != None): if(self.get_protocol_cb != None):
@@ -470,7 +470,7 @@ class Device(HID):
def register_get_protocol_cb(self, cb): def register_get_protocol_cb(self, cb):
self.get_protocol_cb=cb self.get_protocol_cb=cb
logger.debug("GetProtocol callback registered successfully") logger.debug("GetProtocol callback registered successfully")
def handle_set_protocol(self, pdu: bytes): def handle_set_protocol(self, pdu: bytes):
ret = self.GetSetStatus() ret = self.GetSetStatus()
if(self.set_protocol_cb != None): if(self.set_protocol_cb != None):
@@ -479,10 +479,10 @@ class Device(HID):
return return
else: else:
logger.debug("SetProtocol callback not registered !!") logger.debug("SetProtocol callback not registered !!")
self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST) self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST)
def register_set_protocol_cb(self, cb): def register_set_protocol_cb(self, cb):
self.set_protocol_cb=cb self.set_protocol_cb=cb
logger.debug("SetProtocol callback registered successfully") logger.debug("SetProtocol callback registered successfully")

View File

@@ -451,10 +451,10 @@ async def keyboard_device(hid_device, command):
if ord('a') <= code <= ord('z'): if ord('a') <= code <= ord('z'):
hid_code = 0x04 + code - ord('a') hid_code = 0x04 + code - ord('a')
deviceData.setKeyBoardData(bytearray([0x01, 0x00, 0x00, hid_code, 0x00, 0x00, 0x00, 0x00, 0x00])) deviceData.setKeyBoardData(bytearray([0x01, 0x00, 0x00, hid_code, 0x00, 0x00, 0x00, 0x00, 0x00]))
hid_device.send_data(deviceData.getKeyBoardData()) hid_device.send_report_on_interrupt(deviceData.getKeyBoardData())
elif message_type == 'keyup': elif message_type == 'keyup':
deviceData.setKeyBoardData(bytearray([0x01, 0x00, 0x00, hid_code, 0x00, 0x00, 0x00, 0x00, 0x00])) deviceData.setKeyBoardData(bytearray([0x01, 0x00, 0x00, hid_code, 0x00, 0x00, 0x00, 0x00, 0x00]))
hid_device.send_data(deviceData.getKeyBoardData()) hid_device.send_report_on_interrupt(deviceData.getKeyBoardData())
elif message_type == "mousemove": elif message_type == "mousemove":
x = parsed['x'] x = parsed['x']
if x > 127: if x > 127:
@@ -469,7 +469,7 @@ async def keyboard_device(hid_device, command):
x_cord = x.to_bytes(signed = True) x_cord = x.to_bytes(signed = True)
y_cord = y.to_bytes(signed = True) y_cord = y.to_bytes(signed = True)
deviceData.setMouseData(bytearray([0x02, 0x00]) + x_cord + y_cord) deviceData.setMouseData(bytearray([0x02, 0x00]) + x_cord + y_cord)
hid_device.send_data(deviceData.getMouseData()) hid_device.send_report_on_interrupt(deviceData.getMouseData())
except websockets.exceptions.ConnectionClosedOK: except websockets.exceptions.ConnectionClosedOK:
pass pass
@@ -554,7 +554,7 @@ async def main():
def on_set_protocol_cb(protocol): def on_set_protocol_cb(protocol):
retValue = hid_device.GetSetStatus() retValue = hid_device.GetSetStatus()
#We do not support SET_PROTOCOL #We do not support SET_PROTOCOL.
print("SET_PROTOCOL report_id: " + str(protocol)) print("SET_PROTOCOL report_id: " + str(protocol))
retValue.status=hid_device.GetSetReturn.ERR_UNSUPPORTED_REQUEST retValue.status=hid_device.GetSetReturn.ERR_UNSUPPORTED_REQUEST
return retValue return retValue
@@ -603,12 +603,12 @@ async def main():
print(" 2. Connect Interrupt Channel") print(" 2. Connect Interrupt Channel")
print(" 3. Disconnect Control Channel") print(" 3. Disconnect Control Channel")
print(" 4. Disconnect Interrupt Channel") print(" 4. Disconnect Interrupt Channel")
print(" 5. Send Report") print(" 5. Send Report on Interrupt Channel")
print(" 6. Virtual Cable Unplug") print(" 6. Virtual Cable Unplug")
print(" 7. Disconnect device") print(" 7. Disconnect device")
print(" 8. Delete Bonding") print(" 8. Delete Bonding")
print(" 9. Re-connect to device") print(" 9. Re-connect to device")
print("10. Exit Application") print("10. Exit ")
print("\nEnter your choice : \n") print("\nEnter your choice : \n")
choice = await reader.readline() choice = await reader.readline()
@@ -629,21 +629,28 @@ async def main():
elif choice == '5': elif choice == '5':
print(" 1. Report ID 0x01") print(" 1. Report ID 0x01")
print(" 2. Report ID 0x02") print(" 2. Report ID 0x02")
print(" 3. Invalid Report ID")
choice1 = await reader.readline() choice1 = await reader.readline()
choice1 = choice1.decode('utf-8').strip() choice1 = choice1.decode('utf-8').strip()
if choice1 == '1': if choice1 == '1':
data = bytearray([0x01, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00]) data = bytearray([0x01, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00])
hid_device.send_data(data) hid_device.send_report_on_interrupt(data)
data = bytearray([0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) data = bytearray([0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
hid_device.send_data(data) hid_device.send_report_on_interrupt(data)
elif choice1 == '2': elif choice1 == '2':
data = bytearray([0x02, 0x00, 0x00, 0xf6]) data = bytearray([0x02, 0x00, 0x00, 0xf6])
hid_device.send_data(data) hid_device.send_report_on_interrupt(data)
data = bytearray([0x02, 0x00, 0x00, 0x00]) data = bytearray([0x02, 0x00, 0x00, 0x00])
hid_device.send_data(data) hid_device.send_report_on_interrupt(data)
elif choice1 == '3':
data = bytearray([0x00, 0x00, 0x00, 0x00])
hid_device.send_report_on_interrupt(data)
data = bytearray([0x00, 0x00, 0x00, 0x00])
hid_device.send_report_on_interrupt(data)
else: else:
print('Incorrect option selected') print('Incorrect option selected')
@@ -668,7 +675,7 @@ async def main():
hid_host_bd_addr = str(hid_device.remote_device_bd_address) hid_host_bd_addr = str(hid_device.remote_device_bd_address)
await device.keystore.delete(hid_host_bd_addr) await device.keystore.delete(hid_host_bd_addr)
except KeyError: except KeyError:
print('Device not found or Device already unpaired.') print('Device NOT found or Device already unpaired.')
elif choice == '9': elif choice == '9':
hid_host_bd_addr = str(hid_device.remote_device_bd_address) hid_host_bd_addr = str(hid_device.remote_device_bd_address)
@@ -677,7 +684,7 @@ async def main():
await connection.encrypt() await connection.encrypt()
elif choice == '10': elif choice == '10':
sys.exit("Application exit successful") sys.exit("Exit successful")
else: else:
print("Invalid option selected.") print("Invalid option selected.")
@@ -698,6 +705,7 @@ async def main():
await keyboard_device(hid_device, 'web') await keyboard_device(hid_device, 'web')
else: else:
#default option is using keyboard.html (web)
await keyboard_device(hid_device, 'web') await keyboard_device(hid_device, 'web')
await hci_source.wait_for_termination() await hci_source.wait_for_termination()

View File

@@ -377,14 +377,14 @@ async def main():
print(" 6. Set Report") print(" 6. Set Report")
print(" 7. Set Protocol Mode") print(" 7. Set Protocol Mode")
print(" 8. Get Protocol Mode") print(" 8. Get Protocol Mode")
print(" 9. Send Report") print(" 9. Send Report on Interrupt Channel")
print("10. Suspend") print("10. Suspend")
print("11. Exit Suspend") print("11. Exit Suspend")
print("12. Virtual Cable Unplug") print("12. Virtual Cable Unplug")
print("13. Disconnect device") print("13. Disconnect device")
print("14. Delete Bonding") print("14. Delete Bonding")
print("15. Re-connect to device") print("15. Re-connect to device")
print("16. Exit Application") print("16. Exit")
print("\nEnter your choice : \n") print("\nEnter your choice : \n")
choice = await reader.readline() choice = await reader.readline()
@@ -403,28 +403,31 @@ async def main():
await hid_host.disconnect_interrupt_channel() await hid_host.disconnect_interrupt_channel()
elif choice == '5': elif choice == '5':
print(" 1. Report ID 0x02 - Input, Mouse") print(" 1. Input Report with ID 0x01")
print(" 2. Report ID 0x03 - Input, Keyboard") print(" 2. Input Report with ID 0x02")
print(" 3. Report ID 0x05 - Input, Invalid ReportId") print(" 3. Input Report with ID 0x0F - Invalid ReportId")
print(" 4. Report ID 0x02 - Output") print(" 4. Output Report with ID 0x02")
print(" 5. Report ID 0x05 - Feature") print(" 5. Feature Report with ID 0x05 - Unsupported Request")
print(" 6. Input Report with ID 0x02, BufferSize 3")
print(" 7. Output Report with ID 0x03, BufferSize 2")
print(" 8. Feature Report with ID 0x05, BufferSize 3")
choice1 = await reader.readline() choice1 = await reader.readline()
choice1 = choice1.decode('utf-8').strip() choice1 = choice1.decode('utf-8').strip()
if choice1 == '1': if choice1 == '1':
hid_host.get_report(1, 2, 0) hid_host.get_report(1, 1, 0)
elif choice1 == '2': elif choice1 == '2':
hid_host.get_report(1, 1, 0) hid_host.get_report(1, 2, 0)
elif choice1 == '3': elif choice1 == '3':
hid_host.get_report(1, 5, 0) hid_host.get_report(1, 5, 0)
elif choice1 == '4': elif choice1 == '4':
hid_host.get_report(2, 1, 0) hid_host.get_report(2, 2, 0)
elif choice1 == '5': elif choice1 == '5':
hid_host.get_report(3, 5, 0) hid_host.get_report(3, 15, 0)
elif choice1 == '6': elif choice1 == '6':
hid_host.get_report(1, 2, 3) hid_host.get_report(1, 2, 3)
@@ -490,11 +493,11 @@ async def main():
data = bytearray( data = bytearray(
[0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00] [0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]
) )
hid_host.send_data(data) hid_host.send_report_on_interrupt(data)
elif choice1 == '2': elif choice1 == '2':
data = bytearray([0x03, 0x00, 0x0D, 0xFD, 0x00, 0x00]) data = bytearray([0x03, 0x00, 0x0D, 0xFD, 0x00, 0x00])
hid_host.send_data(data) hid_host.send_report_on_interrupt(data)
else: else:
print('Incorrect option selected') print('Incorrect option selected')
@@ -540,7 +543,7 @@ async def main():
await connection.encrypt() await connection.encrypt()
elif choice == '16': elif choice == '16':
sys.exit("Application exit successful") sys.exit("Exit successful")
else: else:
print("Invalid option selected.") print("Invalid option selected.")