forked from auracaster/bumble_mirror
Added GET_REPORT and SET_REPORT changes
Added changes to handle invalid cases
This commit is contained in:
@@ -380,6 +380,7 @@ class Device(HID):
|
|||||||
REPORT_ID_NOT_FOUND = 0x01
|
REPORT_ID_NOT_FOUND = 0x01
|
||||||
ERR_UNSUPPORTED_REQUEST = 0x02
|
ERR_UNSUPPORTED_REQUEST = 0x02
|
||||||
ERR_UNKNOWN = 0x03
|
ERR_UNKNOWN = 0x03
|
||||||
|
ERR_INVALID_PARAMETER = 0x04
|
||||||
SUCCESS = 0xFF
|
SUCCESS = 0xFF
|
||||||
|
|
||||||
class GetSetStatus:
|
class GetSetStatus:
|
||||||
@@ -433,6 +434,8 @@ class Device(HID):
|
|||||||
|
|
||||||
elif ret.status == self.GetSetReturn.REPORT_ID_NOT_FOUND:
|
elif ret.status == self.GetSetReturn.REPORT_ID_NOT_FOUND:
|
||||||
self.send_handshake_message(Message.Handshake.ERR_INVALID_REPORT_ID)
|
self.send_handshake_message(Message.Handshake.ERR_INVALID_REPORT_ID)
|
||||||
|
elif ret.status == self.GetSetReturn.ERR_INVALID_PARAMETER:
|
||||||
|
self.send_handshake_message(Message.Handshake.ERR_INVALID_PARAMETER)
|
||||||
elif ret.status == self.GetSetReturn.ERR_UNSUPPORTED_REQUEST:
|
elif ret.status == self.GetSetReturn.ERR_UNSUPPORTED_REQUEST:
|
||||||
self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST)
|
self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST)
|
||||||
else:
|
else:
|
||||||
@@ -448,14 +451,19 @@ class Device(HID):
|
|||||||
report_type = pdu[0] & 0x03
|
report_type = pdu[0] & 0x03
|
||||||
report_id = pdu[1]
|
report_id = pdu[1]
|
||||||
report_data = pdu[2:]
|
report_data = pdu[2:]
|
||||||
ret = self.set_report_cb(report_id, report_type, report_data)
|
report_size = len(pdu[1:])
|
||||||
|
ret = self.set_report_cb(report_id, report_type, report_size, report_data)
|
||||||
if ret.status == self.GetSetReturn.SUCCESS:
|
if ret.status == self.GetSetReturn.SUCCESS:
|
||||||
self.send_handshake_message(Message.Handshake.SUCCESSFUL)
|
self.send_handshake_message(Message.Handshake.SUCCESSFUL)
|
||||||
return
|
elif ret.status == self.GetSetReturn.ERR_INVALID_PARAMETER:
|
||||||
|
self.send_handshake_message(Message.Handshake.ERR_INVALID_PARAMETER)
|
||||||
|
elif ret.status == self.GetSetReturn.REPORT_ID_NOT_FOUND:
|
||||||
|
self.send_handshake_message(Message.Handshake.ERR_INVALID_REPORT_ID)
|
||||||
|
else:
|
||||||
|
self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST)
|
||||||
else:
|
else:
|
||||||
logger.debug("SetReport callback not registered !!")
|
logger.debug("SetReport callback not registered !!")
|
||||||
|
self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST)
|
||||||
self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST)
|
|
||||||
|
|
||||||
def register_set_report_cb(self, cb):
|
def register_set_report_cb(self, cb):
|
||||||
self.set_report_cb = cb
|
self.set_report_cb = cb
|
||||||
|
|||||||
@@ -418,8 +418,8 @@ async def get_stream_reader(pipe) -> asyncio.StreamReader:
|
|||||||
|
|
||||||
class DeviceData:
|
class DeviceData:
|
||||||
def __init__(self) -> None:
|
def __init__(self) -> None:
|
||||||
self.keyboardData = bytearray([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
|
self.keyboardData = bytearray([0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
|
||||||
self.mouseData = bytearray([0x00, 0x00, 0x00, 0x00])
|
self.mouseData = bytearray([0x02, 0x00, 0x00, 0x00])
|
||||||
|
|
||||||
# Device's live data - Mouse and Keyboard will be stored in this
|
# Device's live data - Mouse and Keyboard will be stored in this
|
||||||
deviceData = DeviceData()
|
deviceData = DeviceData()
|
||||||
@@ -431,7 +431,6 @@ async def keyboard_device(hid_device, command):
|
|||||||
# Start a Websocket server to receive events from a web page
|
# Start a Websocket server to receive events from a web page
|
||||||
async def serve(websocket, _path):
|
async def serve(websocket, _path):
|
||||||
global deviceData
|
global deviceData
|
||||||
# global mouseData
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
message = await websocket.recv()
|
message = await websocket.recv()
|
||||||
@@ -445,25 +444,24 @@ async def keyboard_device(hid_device, command):
|
|||||||
code = ord(key)
|
code = ord(key)
|
||||||
if ord('a') <= code <= ord('z'):
|
if ord('a') <= code <= ord('z'):
|
||||||
hid_code = 0x04 + code - ord('a')
|
hid_code = 0x04 + code - ord('a')
|
||||||
deviceData.setKeyBoardData(bytearray([0x01, 0x00, 0x00, hid_code, 0x00, 0x00, 0x00, 0x00, 0x00]))
|
deviceData.keyboardData = bytearray([0x01, 0x00, 0x00, hid_code, 0x00, 0x00, 0x00, 0x00, 0x00])
|
||||||
hid_device.send_report_on_interrupt(deviceData.getKeyBoardData())
|
hid_device.send_data(deviceData.keyboardData)
|
||||||
elif message_type == 'keyup':
|
elif message_type == 'keyup':
|
||||||
deviceData.setKeyBoardData(bytearray([0x01, 0x00, 0x00, hid_code, 0x00, 0x00, 0x00, 0x00, 0x00]))
|
deviceData.keyboardData = bytearray([0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
|
||||||
hid_device.send_report_on_interrupt(deviceData.getKeyBoardData())
|
hid_device.send_data(deviceData.keyboardData)
|
||||||
elif message_type == "mousemove":
|
elif message_type == "mousemove":
|
||||||
# logical min and max values
|
# logical min and max values
|
||||||
log_min = -127
|
log_min = -127
|
||||||
log_max = 127
|
log_max = 127
|
||||||
x = parsed['x']
|
x = parsed['x']
|
||||||
y = parsed['y']
|
y = parsed['y']
|
||||||
if y > 127:
|
# limiting x and y values within logical max and min range
|
||||||
y = 127
|
x = max(log_min, min(log_max, x))
|
||||||
elif y < -127:
|
y = max(log_min, min(log_max, y))
|
||||||
y = -127
|
|
||||||
x_cord = x.to_bytes(signed = True)
|
x_cord = x.to_bytes(signed = True)
|
||||||
y_cord = y.to_bytes(signed = True)
|
y_cord = y.to_bytes(signed = True)
|
||||||
deviceData.setMouseData(bytearray([0x02, 0x00]) + x_cord + y_cord)
|
deviceData.mouseData = bytearray([0x02, 0x00]) + x_cord + y_cord
|
||||||
hid_device.send_report_on_interrupt(deviceData.getMouseData())
|
hid_device.send_data(deviceData.mouseData)
|
||||||
except websockets.exceptions.ConnectionClosedOK:
|
except websockets.exceptions.ConnectionClosedOK:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@@ -505,10 +503,10 @@ async def main():
|
|||||||
"buffer_size:" + str(buffer_size))
|
"buffer_size:" + str(buffer_size))
|
||||||
if report_type == Message.ReportType.INPUT_REPORT:
|
if report_type == Message.ReportType.INPUT_REPORT:
|
||||||
if report_id == 1:
|
if report_id == 1:
|
||||||
retValue.data = deviceData.keyboardData
|
retValue.data = deviceData.keyboardData[1:]
|
||||||
retValue.status = hid_device.GetSetReturn.SUCCESS
|
retValue.status = hid_device.GetSetReturn.SUCCESS
|
||||||
elif report_id == 2:
|
elif report_id == 2:
|
||||||
retValue.data = deviceData.mouseData
|
retValue.data = deviceData.mouseData[1:]
|
||||||
retValue.status = hid_device.GetSetReturn.SUCCESS
|
retValue.status = hid_device.GetSetReturn.SUCCESS
|
||||||
else:
|
else:
|
||||||
retValue.status = hid_device.GetSetReturn.REPORT_ID_NOT_FOUND
|
retValue.status = hid_device.GetSetReturn.REPORT_ID_NOT_FOUND
|
||||||
@@ -521,21 +519,34 @@ async def main():
|
|||||||
#testing, we will return single byte random data.
|
#testing, we will return single byte random data.
|
||||||
retValue.data = bytearray([0x11])
|
retValue.data = bytearray([0x11])
|
||||||
retValue.status = hid_device.GetSetReturn.SUCCESS
|
retValue.status = hid_device.GetSetReturn.SUCCESS
|
||||||
|
|
||||||
elif report_type == Message.ReportType.FEATURE_REPORT:
|
elif report_type == Message.ReportType.FEATURE_REPORT:
|
||||||
# TBD - not requried for PTS testing
|
retValue.status = hid_device.GetSetReturn.ERR_INVALID_PARAMETER
|
||||||
retValue.status = hid_device.GetSetReturn.ERR_UNSUPPORTED_REQUEST
|
elif report_type == Message.ReportType.OTHER_REPORT:
|
||||||
|
if report_id == 3:
|
||||||
|
retValue.status = hid_device.GetSetReturn.REPORT_ID_NOT_FOUND
|
||||||
else:
|
else:
|
||||||
retValue.status = hid_device.GetSetReturn.FAILURE
|
retValue.status = hid_device.GetSetReturn.FAILURE
|
||||||
|
|
||||||
return retValue
|
return retValue
|
||||||
|
|
||||||
def on_set_report_cb(report_id, report_type, data):
|
def on_set_report_cb(report_id, report_type, report_size, data):
|
||||||
retValue = hid_device.GetSetStatus()
|
retValue = hid_device.GetSetStatus()
|
||||||
print("SET_REPORT report_id: " + str(report_id) +"report_type: "+ str(report_type)+
|
print("SET_REPORT report_id: " + str(report_id) +"report_type: "+ str(report_type)+
|
||||||
"data:" + str(data))
|
"report_size " + str(report_size) + "data:" + str(data))
|
||||||
retValue.status = hid_device.GetSetReturn.SUCCESS
|
if report_type == Message.ReportType.FEATURE_REPORT:
|
||||||
|
retValue.status = hid_device.GetSetReturn.ERR_INVALID_PARAMETER
|
||||||
|
elif report_type == Message.ReportType.INPUT_REPORT:
|
||||||
|
if report_id == 1 and report_size != len(deviceData.keyboardData):
|
||||||
|
retValue.status = hid_device.GetSetReturn.ERR_INVALID_PARAMETER
|
||||||
|
elif report_id == 2 and report_size != len(deviceData.mouseData):
|
||||||
|
retValue.status = hid_device.GetSetReturn.ERR_INVALID_PARAMETER
|
||||||
|
elif report_id == 3:
|
||||||
|
retValue.status = hid_device.GetSetReturn.REPORT_ID_NOT_FOUND
|
||||||
|
else:
|
||||||
|
retValue.status = hid_device.GetSetReturn.SUCCESS
|
||||||
|
else:
|
||||||
|
retValue.status = hid_device.GetSetReturn.SUCCESS
|
||||||
|
|
||||||
return retValue
|
return retValue
|
||||||
|
|
||||||
def on_get_protocol_cb():
|
def on_get_protocol_cb():
|
||||||
|
|||||||
Reference in New Issue
Block a user