forked from auracaster/bumble_mirror
keyboard data moved to DeviceData class
This commit is contained in:
+44
-28
@@ -92,8 +92,15 @@ HID_COUNTRY_CODE = 0x21
|
||||
HID_VIRTUAL_CABLE = True
|
||||
HID_RECONNECT_INITIATE = True
|
||||
REPORT_DESCRIPTOR_TYPE = 0x22
|
||||
|
||||
|
||||
HID_LANGID_BASE_LANGUAGE = 0x0409
|
||||
HID_LANGID_BASE_BLUETOOTH_STRING_OFFSET = 0x100
|
||||
HID_BATTERY_POWER = True
|
||||
HID_REMOTE_WAKE = True
|
||||
HID_SUPERVISION_TIMEOUT = 0xC80
|
||||
HID_NORMALLY_CONNECTABLE = True
|
||||
HID_BOOT_DEVICE = True
|
||||
HID_SSR_HOST_MAX_LATENCY = 0x640
|
||||
HID_SSR_HOST_MIN_TIMEOUT = 0xC80
|
||||
HID_REPORT_MAP = bytes(
|
||||
# pylint: disable=line-too-long
|
||||
[
|
||||
@@ -216,15 +223,7 @@ HID_REPORT_MAP = bytes(
|
||||
0xC0, # End Collection
|
||||
]
|
||||
)
|
||||
HID_LANGID_BASE_LANGUAGE = 0x0409
|
||||
HID_LANGID_BASE_BLUETOOTH_STRING_OFFSET = 0x100
|
||||
HID_BATTERY_POWER = True
|
||||
HID_REMOTE_WAKE = True
|
||||
HID_SUPERVISION_TIMEOUT = 0xC80
|
||||
HID_NORMALLY_CONNECTABLE = True
|
||||
HID_BOOT_DEVICE = True
|
||||
HID_SSR_HOST_MAX_LATENCY = 0x640
|
||||
HID_SSR_HOST_MIN_TIMEOUT = 0xC80
|
||||
|
||||
|
||||
# Default protocol mode set to report protocol
|
||||
protocol_mode = Message.ProtocolMode.REPORT_PROTOCOL
|
||||
@@ -409,17 +408,34 @@ async def get_stream_reader(pipe) -> asyncio.StreamReader:
|
||||
await loop.connect_read_pipe(lambda: protocol, pipe)
|
||||
return reader
|
||||
|
||||
class DeviceData:
|
||||
def __init__(self) -> None:
|
||||
self.keyboardData=bytearray([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
|
||||
self.mouseData=bytearray([0x00, 0x00, 0x00, 0x00])
|
||||
|
||||
def getKeyBoardData(self):
|
||||
return self.keyboardData
|
||||
|
||||
def getMouseData(self):
|
||||
return self.mouseData
|
||||
|
||||
def setKeyBoardData(self, data):
|
||||
self.keyboardData=data
|
||||
|
||||
def setMouseData(self, data):
|
||||
self.mouseData=data
|
||||
|
||||
keyboardData=bytearray([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
|
||||
mouseData=bytearray([0x00, 0x00, 0x00, 0x00])
|
||||
#Device's live data - Mouse and Keyboard will be stored in this
|
||||
deviceData=DeviceData()
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def keyboard_device(hid_device, command):
|
||||
|
||||
if command == 'web':
|
||||
# Start a Websocket server to receive events from a web page
|
||||
async def serve(websocket, _path):
|
||||
global keyboardData
|
||||
global mouseData
|
||||
global deviceData
|
||||
#global mouseData
|
||||
while True:
|
||||
try:
|
||||
message = await websocket.recv()
|
||||
@@ -433,11 +449,11 @@ async def keyboard_device(hid_device, command):
|
||||
code = ord(key)
|
||||
if ord('a') <= code <= ord('z'):
|
||||
hid_code = 0x04 + code - ord('a')
|
||||
keyboardData = bytearray([0x01, 0x00, 0x00, hid_code, 0x00, 0x00, 0x00, 0x00, 0x00])
|
||||
hid_device.send_data(keyboardData)
|
||||
deviceData.setKeyBoardData(bytearray([0x01, 0x00, 0x00, hid_code, 0x00, 0x00, 0x00, 0x00, 0x00]))
|
||||
hid_device.send_data(deviceData.getKeyBoardData())
|
||||
elif message_type == 'keyup':
|
||||
keyboardData = bytearray([0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
|
||||
hid_device.send_data(keyboardData)
|
||||
deviceData.setKeyBoardData(bytearray([0x01, 0x00, 0x00, hid_code, 0x00, 0x00, 0x00, 0x00, 0x00]))
|
||||
hid_device.send_data(deviceData.getKeyBoardData())
|
||||
elif message_type == "mousemove":
|
||||
x = parsed['x']
|
||||
if x > 127:
|
||||
@@ -451,8 +467,8 @@ async def keyboard_device(hid_device, command):
|
||||
y = -127
|
||||
x_cord = x.to_bytes(signed = True)
|
||||
y_cord = y.to_bytes(signed = True)
|
||||
mouseData = bytearray([0x02, 0x00]) + x_cord + y_cord
|
||||
hid_device.send_data(mouseData)
|
||||
deviceData.setMouseData(bytearray([0x02, 0x00]) + x_cord + y_cord)
|
||||
hid_device.send_data(deviceData.getKeyBoardData())
|
||||
except websockets.exceptions.ConnectionClosedOK:
|
||||
pass
|
||||
|
||||
@@ -467,12 +483,12 @@ async def keyboard_device(hid_device, command):
|
||||
|
||||
# Keypress for the letter
|
||||
keycode = 0x04 + letter - 0x61
|
||||
keyboardData = bytearray([0x01, 0x00, 0x00, keycode, 0x00, 0x00, 0x00, 0x00, 0x00])
|
||||
hid_device.send_data(keyboardData)
|
||||
|
||||
deviceData.setKeyBoardData(bytearray([0x01, 0x00, 0x00, keycode, 0x00, 0x00, 0x00, 0x00, 0x00]))
|
||||
hid_device.send_data(deviceData.getKeyBoardData())
|
||||
# Key release
|
||||
keyboardData = bytearray([0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
|
||||
hid_device.send_data(keyboardData)
|
||||
deviceData.setKeyBoardData(bytearray([0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]))
|
||||
hid_device.send_data(deviceData.getKeyBoardData())
|
||||
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -508,10 +524,10 @@ async def main():
|
||||
"buffer_size:" + str(buffer_size))
|
||||
if report_type == Message.ReportType.INPUT_REPORT:
|
||||
if report_id == 1:
|
||||
retValue.data = keyboardData
|
||||
retValue.data = deviceData.getKeyBoardData()
|
||||
retValue.status = hid_device.GetSetReturn.SUCCESS
|
||||
elif report_id == 2:
|
||||
retValue.data = mouseData
|
||||
retValue.data = deviceData.getMouseData()
|
||||
retValue.status = hid_device.GetSetReturn.SUCCESS
|
||||
else:
|
||||
retValue.status = hid_device.GetSetReturn.REPORT_ID_NOT_FOUND
|
||||
|
||||
Reference in New Issue
Block a user