Project format and lint error fix. Redefination if Device class needs to be discussed

This commit is contained in:
skarnataki
2023-11-27 13:04:54 +00:00
parent f47b9178ad
commit 07f71fc895
2 changed files with 68 additions and 27 deletions

View File

@@ -274,8 +274,10 @@ class HID(EventEmitter):
await channel.disconnect() await channel.disconnect()
def on_device_connection(self, connection: Connection) -> None: def on_device_connection(self, connection: Connection) -> None:
self.connection = connection self.connection = connection # type: ignore[assignment]
self.remote_device_bd_address = connection.peer_address self.remote_device_bd_address = (
connection.peer_address
) # type: ignore[assignment]
connection.on('disconnection', self.on_disconnection) connection.on('disconnection', self.on_disconnection)
def on_connection(self, l2cap_channel: l2cap.ClassicChannel) -> None: def on_connection(self, l2cap_channel: l2cap.ClassicChannel) -> None:
@@ -312,16 +314,16 @@ class HID(EventEmitter):
self.emit('handshake', Message.Handshake(param)) self.emit('handshake', Message.Handshake(param))
elif message_type == Message.MessageType.GET_REPORT: elif message_type == Message.MessageType.GET_REPORT:
logger.debug('<<< HID GET REPORT') logger.debug('<<< HID GET REPORT')
self.handle_get_report(pdu) self.handle_get_report(pdu) # type: ignore[attr-defined]
elif message_type == Message.MessageType.SET_REPORT: elif message_type == Message.MessageType.SET_REPORT:
logger.debug('<<< HID SET REPORT') logger.debug('<<< HID SET REPORT')
self.handle_set_report(pdu) self.handle_set_report(pdu) # type: ignore[attr-defined]
elif message_type == Message.MessageType.GET_PROTOCOL: elif message_type == Message.MessageType.GET_PROTOCOL:
logger.debug('<<< HID GET PROTOCOL') logger.debug('<<< HID GET PROTOCOL')
self.handle_get_protocol(pdu) self.handle_get_protocol(pdu) # type: ignore[attr-defined]
elif message_type == Message.MessageType.SET_PROTOCOL: elif message_type == Message.MessageType.SET_PROTOCOL:
logger.debug('<<< HID SET PROTOCOL') logger.debug('<<< HID SET PROTOCOL')
self.handle_set_protocol(pdu) self.handle_set_protocol(pdu) # type: ignore[attr-defined]
elif message_type == Message.MessageType.DATA: elif message_type == Message.MessageType.DATA:
logger.debug('<<< HID CONTROL DATA') logger.debug('<<< HID CONTROL DATA')
self.emit('control_data', pdu) self.emit('control_data', pdu)
@@ -339,7 +341,9 @@ class HID(EventEmitter):
logger.debug('<<< HID CONTROL OPERATION UNSUPPORTED') logger.debug('<<< HID CONTROL OPERATION UNSUPPORTED')
else: else:
logger.debug('<<< HID MESSAGE TYPE UNSUPPORTED') logger.debug('<<< HID MESSAGE TYPE UNSUPPORTED')
self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST) self.send_handshake_message(
Message.Handshake.ERR_UNSUPPORTED_REQUEST
) # type: ignore[attr-defined]
def on_intr_pdu(self, pdu: bytes) -> None: def on_intr_pdu(self, pdu: bytes) -> None:
logger.debug(f'<<< HID INTERRUPT PDU: {pdu.hex()}') logger.debug(f'<<< HID INTERRUPT PDU: {pdu.hex()}')
@@ -374,7 +378,7 @@ class HID(EventEmitter):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class Device(HID): class Device(HID): # type: ignore[no-redef]
class GetSetReturn(enum.IntEnum): class GetSetReturn(enum.IntEnum):
FAILURE = 0x00 FAILURE = 0x00
REPORT_ID_NOT_FOUND = 0x01 REPORT_ID_NOT_FOUND = 0x01
@@ -385,8 +389,8 @@ class Device(HID):
class GetSetStatus: class GetSetStatus:
def __init__(self) -> None: def __init__(self) -> None:
self.data: bytes
self.status = 0 self.status = 0
self.data = None
def __init__(self, device: Device) -> None: def __init__(self, device: Device) -> None:
super().__init__(device, HID.Role.DEVICE) super().__init__(device, HID.Role.DEVICE)
@@ -419,7 +423,9 @@ class Device(HID):
buffer_size = 0 buffer_size = 0
if self.get_report_cb != None: if self.get_report_cb != None:
ret = self.get_report_cb(report_id, report_type, buffer_size) ret = self.get_report_cb(
report_id, report_type, buffer_size
) # type: ignore
if ret.status == self.GetSetReturn.FAILURE: if ret.status == self.GetSetReturn.FAILURE:
self.send_handshake_message(Message.Handshake.ERR_UNKNOWN) self.send_handshake_message(Message.Handshake.ERR_UNKNOWN)
@@ -427,7 +433,9 @@ class Device(HID):
data = bytearray() data = bytearray()
data.append(report_id) data.append(report_id)
data.extend(ret.data) data.extend(ret.data)
if len(data) < self.l2cap_ctrl_channel.mtu: if (
len(data) < self.l2cap_ctrl_channel.mtu
): # type: ignore[union-attr]
self.send_control_data(report_type=report_type, data=data) self.send_control_data(report_type=report_type, data=data)
else: else:
self.send_handshake_message(Message.Handshake.ERR_INVALID_PARAMETER) self.send_handshake_message(Message.Handshake.ERR_INVALID_PARAMETER)
@@ -452,7 +460,9 @@ class Device(HID):
report_id = pdu[1] report_id = pdu[1]
report_data = pdu[2:] report_data = pdu[2:]
report_size = len(pdu[1:]) report_size = len(pdu[1:])
ret = self.set_report_cb(report_id, report_type, report_size, report_data) ret = self.set_report_cb(
report_id, report_type, report_size, report_data
) # type: ignore
if ret.status == self.GetSetReturn.SUCCESS: if ret.status == self.GetSetReturn.SUCCESS:
self.send_handshake_message(Message.Handshake.SUCCESSFUL) self.send_handshake_message(Message.Handshake.SUCCESSFUL)
elif ret.status == self.GetSetReturn.ERR_INVALID_PARAMETER: elif ret.status == self.GetSetReturn.ERR_INVALID_PARAMETER:
@@ -472,7 +482,7 @@ class Device(HID):
def handle_get_protocol(self, pdu: bytes): def handle_get_protocol(self, pdu: bytes):
ret = self.GetSetStatus() ret = self.GetSetStatus()
if self.get_protocol_cb != None: if self.get_protocol_cb != None:
ret = self.get_protocol_cb() ret = self.get_protocol_cb() # type: ignore
if ret.status == self.GetSetReturn.SUCCESS: if ret.status == self.GetSetReturn.SUCCESS:
self.send_control_data(Message.ReportType.OTHER_REPORT, ret.data) self.send_control_data(Message.ReportType.OTHER_REPORT, ret.data)
return return
@@ -488,7 +498,7 @@ class Device(HID):
def handle_set_protocol(self, pdu: bytes): def handle_set_protocol(self, pdu: bytes):
ret = self.GetSetStatus() ret = self.GetSetStatus()
if self.set_protocol_cb != None: if self.set_protocol_cb != None:
ret = self.set_protocol_cb(pdu[0] & 0x01) ret = self.set_protocol_cb(pdu[0] & 0x01) # type: ignore
if ret.status == self.GetSetReturn.SUCCESS: if ret.status == self.GetSetReturn.SUCCESS:
return return
else: else:

View File

@@ -418,9 +418,12 @@ async def get_stream_reader(pipe) -> asyncio.StreamReader:
class DeviceData: class DeviceData:
def __init__(self) -> None: def __init__(self) -> None:
self.keyboardData = bytearray([0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) self.keyboardData = bytearray(
[0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]
)
self.mouseData = bytearray([0x02, 0x00, 0x00, 0x00]) self.mouseData = bytearray([0x02, 0x00, 0x00, 0x00])
# Device's live data - Mouse and Keyboard will be stored in this # Device's live data - Mouse and Keyboard will be stored in this
deviceData = DeviceData() deviceData = DeviceData()
@@ -444,10 +447,24 @@ async def keyboard_device(hid_device, command):
code = ord(key) code = ord(key)
if ord('a') <= code <= ord('z'): if ord('a') <= code <= ord('z'):
hid_code = 0x04 + code - ord('a') hid_code = 0x04 + code - ord('a')
deviceData.keyboardData = bytearray([0x01, 0x00, 0x00, hid_code, 0x00, 0x00, 0x00, 0x00, 0x00]) deviceData.keyboardData = bytearray(
[
0x01,
0x00,
0x00,
hid_code,
0x00,
0x00,
0x00,
0x00,
0x00,
]
)
hid_device.send_data(deviceData.keyboardData) hid_device.send_data(deviceData.keyboardData)
elif message_type == 'keyup': elif message_type == 'keyup':
deviceData.keyboardData = bytearray([0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) deviceData.keyboardData = bytearray(
[0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]
)
hid_device.send_data(deviceData.keyboardData) hid_device.send_data(deviceData.keyboardData)
elif message_type == "mousemove": elif message_type == "mousemove":
# logical min and max values # logical min and max values
@@ -458,8 +475,8 @@ async def keyboard_device(hid_device, command):
# limiting x and y values within logical max and min range # limiting x and y values within logical max and min range
x = max(log_min, min(log_max, x)) x = max(log_min, min(log_max, x))
y = max(log_min, min(log_max, y)) y = max(log_min, min(log_max, y))
x_cord = x.to_bytes(signed = True) x_cord = x.to_bytes(signed=True)
y_cord = y.to_bytes(signed = True) y_cord = y.to_bytes(signed=True)
deviceData.mouseData = bytearray([0x02, 0x00]) + x_cord + y_cord deviceData.mouseData = bytearray([0x02, 0x00]) + x_cord + y_cord
hid_device.send_data(deviceData.mouseData) hid_device.send_data(deviceData.mouseData)
except websockets.exceptions.ConnectionClosedOK: except websockets.exceptions.ConnectionClosedOK:
@@ -499,8 +516,14 @@ async def main():
def on_get_report_cb(report_id, report_type, buffer_size): def on_get_report_cb(report_id, report_type, buffer_size):
retValue = hid_device.GetSetStatus() retValue = hid_device.GetSetStatus()
print("GET_REPORT report_id: " + str(report_id) +"report_type: "+ str(report_type)+ print(
"buffer_size:" + str(buffer_size)) "GET_REPORT report_id: "
+ str(report_id)
+ "report_type: "
+ str(report_type)
+ "buffer_size:"
+ str(buffer_size)
)
if report_type == Message.ReportType.INPUT_REPORT: if report_type == Message.ReportType.INPUT_REPORT:
if report_id == 1: if report_id == 1:
retValue.data = deviceData.keyboardData[1:] retValue.data = deviceData.keyboardData[1:]
@@ -515,8 +538,8 @@ async def main():
data_len = buffer_size - 1 data_len = buffer_size - 1
retValue.data = retValue.data[:data_len] retValue.data = retValue.data[:data_len]
elif report_type == Message.ReportType.OUTPUT_REPORT: elif report_type == Message.ReportType.OUTPUT_REPORT:
#This sample app has nothing to do with the report received, to enable PTS # This sample app has nothing to do with the report received, to enable PTS
#testing, we will return single byte random data. # testing, we will return single byte random data.
retValue.data = bytearray([0x11]) retValue.data = bytearray([0x11])
retValue.status = hid_device.GetSetReturn.SUCCESS retValue.status = hid_device.GetSetReturn.SUCCESS
elif report_type == Message.ReportType.FEATURE_REPORT: elif report_type == Message.ReportType.FEATURE_REPORT:
@@ -531,15 +554,23 @@ async def main():
def on_set_report_cb(report_id, report_type, report_size, data): def on_set_report_cb(report_id, report_type, report_size, data):
retValue = hid_device.GetSetStatus() retValue = hid_device.GetSetStatus()
print("SET_REPORT report_id: " + str(report_id) +"report_type: "+ str(report_type)+ print(
"report_size " + str(report_size) + "data:" + str(data)) "SET_REPORT report_id: "
+ str(report_id)
+ "report_type: "
+ str(report_type)
+ "report_size "
+ str(report_size)
+ "data:"
+ str(data)
)
if report_type == Message.ReportType.FEATURE_REPORT: if report_type == Message.ReportType.FEATURE_REPORT:
retValue.status = hid_device.GetSetReturn.ERR_INVALID_PARAMETER retValue.status = hid_device.GetSetReturn.ERR_INVALID_PARAMETER
elif report_type == Message.ReportType.INPUT_REPORT: elif report_type == Message.ReportType.INPUT_REPORT:
if report_id == 1 and report_size != len(deviceData.keyboardData): if report_id == 1 and report_size != len(deviceData.keyboardData):
retValue.status = hid_device.GetSetReturn.ERR_INVALID_PARAMETER retValue.status = hid_device.GetSetReturn.ERR_INVALID_PARAMETER
elif report_id == 2 and report_size != len(deviceData.mouseData): elif report_id == 2 and report_size != len(deviceData.mouseData):
retValue.status = hid_device.GetSetReturn.ERR_INVALID_PARAMETER retValue.status = hid_device.GetSetReturn.ERR_INVALID_PARAMETER
elif report_id == 3: elif report_id == 3:
retValue.status = hid_device.GetSetReturn.REPORT_ID_NOT_FOUND retValue.status = hid_device.GetSetReturn.REPORT_ID_NOT_FOUND
else: else: