send_data comment fix and lint error fix

This commit is contained in:
skarnataki
2023-11-24 06:11:52 +00:00
parent d1033c018a
commit 9324237828
3 changed files with 113 additions and 96 deletions

View File

@@ -120,6 +120,7 @@ class SetReportMessage(Message):
def __bytes__(self) -> bytes: def __bytes__(self) -> bytes:
return self.header(self.report_type) + self.data return self.header(self.report_type) + self.data
@dataclass @dataclass
class SendControlData(Message): class SendControlData(Message):
report_type: int report_type: int
@@ -131,6 +132,8 @@ class SendControlData(Message):
packet_bytes.extend(self.data) packet_bytes.extend(self.data)
return self.header(self.report_type) + packet_bytes return self.header(self.report_type) + packet_bytes
@dataclass @dataclass
class GetProtocolMessage(Message): class GetProtocolMessage(Message):
message_type = Message.MessageType.GET_PROTOCOL message_type = Message.MessageType.GET_PROTOCOL
@@ -147,6 +150,7 @@ class SetProtocolMessage(Message):
def __bytes__(self) -> bytes: def __bytes__(self) -> bytes:
return self.header(self.protocol_mode) return self.header(self.protocol_mode)
@dataclass @dataclass
class GetProtocolReplyMessage(Message): class GetProtocolReplyMessage(Message):
protocol_mode: int protocol_mode: int
@@ -157,6 +161,7 @@ class GetProtocolReplyMessage(Message):
packet_bytes.append(self.protocol_mode) packet_bytes.append(self.protocol_mode)
return self.header(Message.ReportType.OTHER_REPORT) + packet_bytes return self.header(Message.ReportType.OTHER_REPORT) + packet_bytes
@dataclass @dataclass
class Suspend(Message): class Suspend(Message):
message_type = Message.MessageType.CONTROL message_type = Message.MessageType.CONTROL
@@ -180,7 +185,8 @@ class VirtualCableUnplug(Message):
def __bytes__(self) -> bytes: def __bytes__(self) -> bytes:
return self.header(Message.ControlCommand.VIRTUAL_CABLE_UNPLUG) return self.header(Message.ControlCommand.VIRTUAL_CABLE_UNPLUG)
#Device sends input report, host sends output report.
# Device sends input report, host sends output report.
@dataclass @dataclass
class SendData(Message): class SendData(Message):
data: bytes data: bytes
@@ -190,6 +196,7 @@ class SendData(Message):
def __bytes__(self) -> bytes: def __bytes__(self) -> bytes:
return self.header(self.report_type) + self.data return self.header(self.report_type) + self.data
@dataclass @dataclass
class SendHandshakeMessage(Message): class SendHandshakeMessage(Message):
result_code: int result_code: int
@@ -208,7 +215,6 @@ class HID(EventEmitter):
HOST = 0x00 HOST = 0x00
DEVICE = 0x01 DEVICE = 0x01
def __init__(self, device: Device, role: int) -> None: def __init__(self, device: Device, role: int) -> None:
super().__init__() super().__init__()
self.device = device self.device = device
@@ -347,7 +353,7 @@ class HID(EventEmitter):
assert self.l2cap_intr_channel assert self.l2cap_intr_channel
self.l2cap_intr_channel.send_pdu(msg) self.l2cap_intr_channel.send_pdu(msg)
def send_report_on_interrupt(self, data: bytes) -> None: def send_data(self, data: bytes) -> None:
if self.role == HID.Role.HOST: if self.role == HID.Role.HOST:
report_type = Message.ReportType.OUTPUT_REPORT report_type = Message.ReportType.OUTPUT_REPORT
else: else:
@@ -368,20 +374,18 @@ class HID(EventEmitter):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class Device(HID): class Device(HID):
class GetSetReturn(enum.IntEnum): class GetSetReturn(enum.IntEnum):
FAILURE = 0x00 FAILURE = 0x00
REPORT_ID_NOT_FOUND = 0x01 REPORT_ID_NOT_FOUND = 0x01
ERR_UNSUPPORTED_REQUEST = 0x02 ERR_UNSUPPORTED_REQUEST = 0x02
ERR_UNKNOWN = 0x03 ERR_UNKNOWN = 0x03
SUCCESS = 0xff SUCCESS = 0xFF
class GetSetStatus:
class GetSetStatus():
def __init__(self) -> None: def __init__(self) -> None:
self.status = 0 self.status = 0
self.data=None self.data = None
def __init__(self, device: Device) -> None: def __init__(self, device: Device) -> None:
super().__init__(device, HID.Role.DEVICE) super().__init__(device, HID.Role.DEVICE)
@@ -396,56 +400,56 @@ class Device(HID):
logger.debug(f'>>> HID HANDSHAKE MESSAGE, PDU: {hid_message.hex()}') logger.debug(f'>>> HID HANDSHAKE MESSAGE, PDU: {hid_message.hex()}')
self.send_pdu_on_ctrl(hid_message) self.send_pdu_on_ctrl(hid_message)
def send_control_data(self,report_type: int, data: bytes): def send_control_data(self, report_type: int, data: bytes):
msg = SendControlData(report_type= report_type, data=data) msg = SendControlData(report_type=report_type, data=data)
hid_message = bytes(msg) hid_message = bytes(msg)
logger.debug(f'>>> HID CONTROL DATA: {hid_message.hex()}') logger.debug(f'>>> HID CONTROL DATA: {hid_message.hex()}')
self.send_pdu_on_ctrl(hid_message) self.send_pdu_on_ctrl(hid_message)
def handle_get_report(self, pdu: bytes): def handle_get_report(self, pdu: bytes):
ret = self.GetSetStatus() ret = self.GetSetStatus()
report_type=pdu[0] & 0x03 report_type = pdu[0] & 0x03
buffer_flag = (pdu[0] & 0x08) >> 3 buffer_flag = (pdu[0] & 0x08) >> 3
report_id = pdu[1] report_id = pdu[1]
logger.debug("buffer_flag: " + str(buffer_flag)) logger.debug("buffer_flag: " + str(buffer_flag))
if(buffer_flag == 1): if buffer_flag == 1:
buffer_size = (pdu[3] << 8) | pdu[2] buffer_size = (pdu[3] << 8) | pdu[2]
else: else:
buffer_size = 0 buffer_size = 0
if(self.get_report_cb != None): if self.get_report_cb != None:
ret = self.get_report_cb(report_id, report_type, buffer_size) ret = self.get_report_cb(report_id, report_type, buffer_size)
if(ret.status == self.GetSetReturn.FAILURE): if ret.status == self.GetSetReturn.FAILURE:
self.send_handshake_message(Message.Handshake.ERR_UNKNOWN) self.send_handshake_message(Message.Handshake.ERR_UNKNOWN)
elif(ret.status == self.GetSetReturn.SUCCESS): elif ret.status == self.GetSetReturn.SUCCESS:
data = bytearray() data = bytearray()
data.append(report_id) data.append(report_id)
data.extend(ret.data) data.extend(ret.data)
if(len(data)<self.l2cap_ctrl_channel.mtu): if len(data) < self.l2cap_ctrl_channel.mtu:
self.send_control_data(report_type=report_type, data = data) self.send_control_data(report_type=report_type, data=data)
else: else:
self.send_handshake_message(Message.Handshake.ERR_INVALID_PARAMETER) self.send_handshake_message(Message.Handshake.ERR_INVALID_PARAMETER)
elif(ret.status == self.GetSetReturn.REPORT_ID_NOT_FOUND): elif ret.status == self.GetSetReturn.REPORT_ID_NOT_FOUND:
self.send_handshake_message(Message.Handshake.ERR_INVALID_REPORT_ID) self.send_handshake_message(Message.Handshake.ERR_INVALID_REPORT_ID)
elif(ret.status == self.GetSetReturn.ERR_UNSUPPORTED_REQUEST): elif ret.status == self.GetSetReturn.ERR_UNSUPPORTED_REQUEST:
self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST) self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST)
else: else:
logger.debug("GetReport callback not registered !!") logger.debug("GetReport callback not registered !!")
self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST) self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST)
def register_get_report_cb(self,cb): def register_get_report_cb(self, cb):
self.get_report_cb=cb self.get_report_cb = cb
logger.debug("GetReport callback registered successfully") logger.debug("GetReport callback registered successfully")
def handle_set_report(self, pdu: bytes): def handle_set_report(self, pdu: bytes):
if(self.set_report_cb != None): if self.set_report_cb != None:
report_type=pdu[0] & 0x03 report_type = pdu[0] & 0x03
report_id = pdu[1] report_id = pdu[1]
report_data = pdu[2:] report_data = pdu[2:]
ret = self.set_report_cb(report_id, report_type, report_data) ret = self.set_report_cb(report_id, report_type, report_data)
if(ret.status == self.GetSetReturn.SUCCESS): if ret.status == self.GetSetReturn.SUCCESS:
self.send_handshake_message(Message.Handshake.SUCCESSFUL) self.send_handshake_message(Message.Handshake.SUCCESSFUL)
return return
else: else:
@@ -454,14 +458,14 @@ class Device(HID):
self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST) self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST)
def register_set_report_cb(self, cb): def register_set_report_cb(self, cb):
self.set_report_cb=cb self.set_report_cb = cb
logger.debug("SetReport callback registered successfully") logger.debug("SetReport callback registered successfully")
def handle_get_protocol(self, pdu: bytes): def handle_get_protocol(self, pdu: bytes):
ret = self.GetSetStatus() ret = self.GetSetStatus()
if(self.get_protocol_cb != None): if self.get_protocol_cb != None:
ret=self.get_protocol_cb() ret = self.get_protocol_cb()
if(ret.status == self.GetSetReturn.SUCCESS): if ret.status == self.GetSetReturn.SUCCESS:
self.send_control_data(Message.ReportType.OTHER_REPORT, ret.data) self.send_control_data(Message.ReportType.OTHER_REPORT, ret.data)
return return
else: else:
@@ -470,25 +474,25 @@ class Device(HID):
self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST) self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST)
def register_get_protocol_cb(self, cb): def register_get_protocol_cb(self, cb):
self.get_protocol_cb=cb self.get_protocol_cb = cb
logger.debug("GetProtocol callback registered successfully") logger.debug("GetProtocol callback registered successfully")
def handle_set_protocol(self, pdu: bytes): def handle_set_protocol(self, pdu: bytes):
ret = self.GetSetStatus() ret = self.GetSetStatus()
if(self.set_protocol_cb != None): if self.set_protocol_cb != None:
ret=self.set_protocol_cb(pdu[0] & 0x01) ret = self.set_protocol_cb(pdu[0] & 0x01)
if(ret.status == self.GetSetReturn.SUCCESS): if ret.status == self.GetSetReturn.SUCCESS:
return return
else: else:
logger.debug("SetProtocol callback not registered !!") logger.debug("SetProtocol callback not registered !!")
self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST) self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST)
def register_set_protocol_cb(self, cb): def register_set_protocol_cb(self, cb):
self.set_protocol_cb=cb self.set_protocol_cb = cb
logger.debug("SetProtocol callback registered successfully") logger.debug("SetProtocol callback registered successfully")
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class Host(HID): class Host(HID):
def __init__(self, device: Device) -> None: def __init__(self, device: Device) -> None:

View File

@@ -60,7 +60,7 @@ from bumble.utils import AsyncRunner
SDP_HID_SERVICE_NAME_ATTRIBUTE_ID = 0x0100 SDP_HID_SERVICE_NAME_ATTRIBUTE_ID = 0x0100
SDP_HID_SERVICE_DESCRIPTION_ATTRIBUTE_ID = 0x0101 SDP_HID_SERVICE_DESCRIPTION_ATTRIBUTE_ID = 0x0101
SDP_HID_PROVIDER_NAME_ATTRIBUTE_ID = 0x0102 SDP_HID_PROVIDER_NAME_ATTRIBUTE_ID = 0x0102
SDP_HID_DEVICE_RELEASE_NUMBER_ATTRIBUTE_ID = 0x0200 # [DEPRECATED] SDP_HID_DEVICE_RELEASE_NUMBER_ATTRIBUTE_ID = 0x0200 # [DEPRECATED]
SDP_HID_PARSER_VERSION_ATTRIBUTE_ID = 0x0201 SDP_HID_PARSER_VERSION_ATTRIBUTE_ID = 0x0201
SDP_HID_DEVICE_SUBCLASS_ATTRIBUTE_ID = 0x0202 SDP_HID_DEVICE_SUBCLASS_ATTRIBUTE_ID = 0x0202
SDP_HID_COUNTRY_CODE_ATTRIBUTE_ID = 0x0203 SDP_HID_COUNTRY_CODE_ATTRIBUTE_ID = 0x0203
@@ -68,20 +68,20 @@ SDP_HID_VIRTUAL_CABLE_ATTRIBUTE_ID = 0x0204
SDP_HID_RECONNECT_INITIATE_ATTRIBUTE_ID = 0x0205 SDP_HID_RECONNECT_INITIATE_ATTRIBUTE_ID = 0x0205
SDP_HID_DESCRIPTOR_LIST_ATTRIBUTE_ID = 0x0206 SDP_HID_DESCRIPTOR_LIST_ATTRIBUTE_ID = 0x0206
SDP_HID_LANGID_BASE_LIST_ATTRIBUTE_ID = 0x0207 SDP_HID_LANGID_BASE_LIST_ATTRIBUTE_ID = 0x0207
SDP_HID_SDP_DISABLE_ATTRIBUTE_ID = 0x0208 # [DEPRECATED] SDP_HID_SDP_DISABLE_ATTRIBUTE_ID = 0x0208 # [DEPRECATED]
SDP_HID_BATTERY_POWER_ATTRIBUTE_ID = 0x0209 SDP_HID_BATTERY_POWER_ATTRIBUTE_ID = 0x0209
SDP_HID_REMOTE_WAKE_ATTRIBUTE_ID = 0x020A SDP_HID_REMOTE_WAKE_ATTRIBUTE_ID = 0x020A
SDP_HID_PROFILE_VERSION_ATTRIBUTE_ID = 0x020B # DEPRECATED] SDP_HID_PROFILE_VERSION_ATTRIBUTE_ID = 0x020B # DEPRECATED]
SDP_HID_SUPERVISION_TIMEOUT_ATTRIBUTE_ID = 0x020C SDP_HID_SUPERVISION_TIMEOUT_ATTRIBUTE_ID = 0x020C
SDP_HID_NORMALLY_CONNECTABLE_ATTRIBUTE_ID = 0x020D SDP_HID_NORMALLY_CONNECTABLE_ATTRIBUTE_ID = 0x020D
SDP_HID_BOOT_DEVICE_ATTRIBUTE_ID = 0x020E SDP_HID_BOOT_DEVICE_ATTRIBUTE_ID = 0x020E
SDP_HID_SSR_HOST_MAX_LATENCY_ATTRIBUTE_ID = 0x020F SDP_HID_SSR_HOST_MAX_LATENCY_ATTRIBUTE_ID = 0x020F
SDP_HID_SSR_HOST_MIN_TIMEOUT_ATTRIBUTE_ID = 0x0210 SDP_HID_SSR_HOST_MIN_TIMEOUT_ATTRIBUTE_ID = 0x0210
# Refer to HID profile specification v1.1.1, "5.3 Service Discovery Protocol (SDP)" for details # Refer to HID profile specification v1.1.1, "5.3 Service Discovery Protocol (SDP)" for details
# HID SDP attribute values # HID SDP attribute values
LANGUAGE = 0x656e # 0x656E uint16 “en” (English) LANGUAGE = 0x656E # 0x656E uint16 “en” (English)
ENCODING = 0x6a # 0x006A uint16 UTF-8 encoding ENCODING = 0x6A # 0x006A uint16 UTF-8 encoding
PRIMARY_LANGUAGE_BASE_ID = 0x100 # 0x0100 uint16 PrimaryLanguageBaseID PRIMARY_LANGUAGE_BASE_ID = 0x100 # 0x0100 uint16 PrimaryLanguageBaseID
VERSION_NUMBER = 0x0101 # 0x0101 uint16 version number (v1.1) VERSION_NUMBER = 0x0101 # 0x0101 uint16 version number (v1.1)
SERVICE_NAME = b'Bumble HID' SERVICE_NAME = b'Bumble HID'
@@ -222,7 +222,7 @@ HID_REPORT_MAP = bytes( # Text String, 50 Octet Report Descriptor
0x06, # . Input (Data,Var,Rel,No Wrap,Linear,Preferred State,No Null Position) 0x06, # . Input (Data,Var,Rel,No Wrap,Linear,Preferred State,No Null Position)
0xC0, # . End Collection 0xC0, # . End Collection
0xC0, # End Collection 0xC0, # End Collection
] ]
) )
@@ -298,7 +298,9 @@ def sdp_records():
DataElement.sequence( DataElement.sequence(
[ [
DataElement.uuid(BT_L2CAP_PROTOCOL_ID), DataElement.uuid(BT_L2CAP_PROTOCOL_ID),
DataElement.unsigned_integer_16(HID_INTERRUPT_PSM), DataElement.unsigned_integer_16(
HID_INTERRUPT_PSM
),
] ]
), ),
DataElement.sequence( DataElement.sequence(
@@ -362,8 +364,12 @@ def sdp_records():
[ [
DataElement.sequence( DataElement.sequence(
[ [
DataElement.unsigned_integer_16(HID_LANGID_BASE_LANGUAGE), DataElement.unsigned_integer_16(
DataElement.unsigned_integer_16(HID_LANGID_BASE_BLUETOOTH_STRING_OFFSET), HID_LANGID_BASE_LANGUAGE
),
DataElement.unsigned_integer_16(
HID_LANGID_BASE_BLUETOOTH_STRING_OFFSET
),
] ]
), ),
] ]
@@ -415,8 +421,8 @@ class DeviceData:
self.keyboardData = bytearray([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) self.keyboardData = bytearray([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
self.mouseData = bytearray([0x00, 0x00, 0x00, 0x00]) self.mouseData = bytearray([0x00, 0x00, 0x00, 0x00])
#Device's live data - Mouse and Keyboard will be stored in this # Device's live data - Mouse and Keyboard will be stored in this
deviceData=DeviceData() deviceData = DeviceData()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def keyboard_device(hid_device, command): async def keyboard_device(hid_device, command):
@@ -425,6 +431,7 @@ async def keyboard_device(hid_device, command):
# Start a Websocket server to receive events from a web page # Start a Websocket server to receive events from a web page
async def serve(websocket, _path): async def serve(websocket, _path):
global deviceData global deviceData
# global mouseData
while True: while True:
try: try:
message = await websocket.recv() message = await websocket.recv()
@@ -438,24 +445,25 @@ async def keyboard_device(hid_device, command):
code = ord(key) code = ord(key)
if ord('a') <= code <= ord('z'): if ord('a') <= code <= ord('z'):
hid_code = 0x04 + code - ord('a') hid_code = 0x04 + code - ord('a')
deviceData.keyboardData = bytearray([0x01, 0x00, 0x00, hid_code, 0x00, 0x00, 0x00, 0x00, 0x00]) deviceData.setKeyBoardData(bytearray([0x01, 0x00, 0x00, hid_code, 0x00, 0x00, 0x00, 0x00, 0x00]))
hid_device.send_report_on_interrupt(deviceData.keyboardData) hid_device.send_report_on_interrupt(deviceData.getKeyBoardData())
elif message_type == 'keyup': elif message_type == 'keyup':
deviceData.keyboardData = bytearray([0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) deviceData.setKeyBoardData(bytearray([0x01, 0x00, 0x00, hid_code, 0x00, 0x00, 0x00, 0x00, 0x00]))
hid_device.send_report_on_interrupt(deviceData.keyboardData) hid_device.send_report_on_interrupt(deviceData.getKeyBoardData())
elif message_type == "mousemove": elif message_type == "mousemove":
# logical min and max values # logical min and max values
log_min = -127 log_min = -127
log_max = 127 log_max = 127
x = parsed['x'] x = parsed['x']
y = parsed['y'] y = parsed['y']
# limiting x and y values within logical max and min range if y > 127:
x = max(log_min, min(log_max, x)) y = 127
y = max(log_min, min(log_max, y)) elif y < -127:
y = -127
x_cord = x.to_bytes(signed = True) x_cord = x.to_bytes(signed = True)
y_cord = y.to_bytes(signed = True) y_cord = y.to_bytes(signed = True)
deviceData.mouseData = bytearray([0x02, 0x00]) + x_cord + y_cord deviceData.setMouseData(bytearray([0x02, 0x00]) + x_cord + y_cord)
hid_device.send_report_on_interrupt(deviceData.mouseData) hid_device.send_report_on_interrupt(deviceData.getMouseData())
except websockets.exceptions.ConnectionClosedOK: except websockets.exceptions.ConnectionClosedOK:
pass pass
@@ -464,7 +472,6 @@ async def keyboard_device(hid_device, command):
await asyncio.get_event_loop().create_future() await asyncio.get_event_loop().create_future()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def main(): async def main():
if len(sys.argv) < 3: if len(sys.argv) < 3:
@@ -484,7 +491,7 @@ async def main():
hid_host_bd_addr = str(hid_device.remote_device_bd_address) hid_host_bd_addr = str(hid_device.remote_device_bd_address)
await hid_device.disconnect_interrupt_channel() await hid_device.disconnect_interrupt_channel()
await hid_device.disconnect_control_channel() await hid_device.disconnect_control_channel()
await device.keystore.delete(hid_host_bd_addr) #type: ignore await device.keystore.delete(hid_host_bd_addr) # type: ignore
connection = hid_device.connection connection = hid_device.connection
if connection is not None: if connection is not None:
await connection.disconnect() await connection.disconnect()
@@ -492,9 +499,9 @@ async def main():
def on_hid_data_cb(pdu): def on_hid_data_cb(pdu):
print(f'Received Data, PDU: {pdu.hex()}') print(f'Received Data, PDU: {pdu.hex()}')
def on_get_report_cb(report_id,report_type, buffer_size): def on_get_report_cb(report_id, report_type, buffer_size):
retValue = hid_device.GetSetStatus() retValue = hid_device.GetSetStatus()
print("GET_REPORT report_id: " + str(report_id) +"report_type: "+ str(report_type)+ print("GET_REPORT report_id: " + str(report_id) +"report_type: "+ str(report_type)+
"buffer_size:" + str(buffer_size)) "buffer_size:" + str(buffer_size))
if report_type == Message.ReportType.INPUT_REPORT: if report_type == Message.ReportType.INPUT_REPORT:
if report_id == 1: if report_id == 1:
@@ -506,17 +513,17 @@ async def main():
else: else:
retValue.status = hid_device.GetSetReturn.REPORT_ID_NOT_FOUND retValue.status = hid_device.GetSetReturn.REPORT_ID_NOT_FOUND
if(buffer_size): if buffer_size:
data_len = buffer_size -1 data_len = buffer_size - 1
retValue.data = retValue.data[:data_len] retValue.data = retValue.data[:data_len]
elif report_type == Message.ReportType.OUTPUT_REPORT: elif report_type == Message.ReportType.OUTPUT_REPORT:
#This sample app has nothing to do with the report received, to enable PTS #This sample app has nothing to do with the report received, to enable PTS
#testing, we will return single byte random data. #testing, we will return single byte random data.
retValue.data = bytearray([0x11]) retValue.data = bytearray([0x11])
retValue.status = hid_device.GetSetReturn.SUCCESS retValue.status = hid_device.GetSetReturn.SUCCESS
elif report_type == Message.ReportType.FEATURE_REPORT: elif report_type == Message.ReportType.FEATURE_REPORT:
#TBD - not requried for PTS testing # TBD - not requried for PTS testing
retValue.status = hid_device.GetSetReturn.ERR_UNSUPPORTED_REQUEST retValue.status = hid_device.GetSetReturn.ERR_UNSUPPORTED_REQUEST
else: else:
@@ -526,23 +533,22 @@ async def main():
def on_set_report_cb(report_id, report_type, data): def on_set_report_cb(report_id, report_type, data):
retValue = hid_device.GetSetStatus() retValue = hid_device.GetSetStatus()
print("SET_REPORT report_id: " + str(report_id) +"report_type: "+ str(report_type)+ print("SET_REPORT report_id: " + str(report_id) +"report_type: "+ str(report_type)+
"data:" + str(data)) "data:" + str(data))
retValue.status = hid_device.GetSetReturn.SUCCESS retValue.status = hid_device.GetSetReturn.SUCCESS
return retValue return retValue
def on_get_protocol_cb(): def on_get_protocol_cb():
retValue = hid_device.GetSetStatus() retValue = hid_device.GetSetStatus()
retValue.data=protocol_mode.to_bytes() retValue.data = protocol_mode.to_bytes()
retValue.status=hid_device.GetSetReturn.SUCCESS retValue.status = hid_device.GetSetReturn.SUCCESS
return retValue return retValue
def on_set_protocol_cb(protocol): def on_set_protocol_cb(protocol):
retValue = hid_device.GetSetStatus() retValue = hid_device.GetSetStatus()
#We do not support SET_PROTOCOL. # We do not support SET_PROTOCOL.
print("SET_PROTOCOL report_id: " + str(protocol)) print("SET_PROTOCOL report_id: " + str(protocol))
retValue.status=hid_device.GetSetReturn.ERR_UNSUPPORTED_REQUEST retValue.status = hid_device.GetSetReturn.ERR_UNSUPPORTED_REQUEST
return retValue return retValue
def on_virtual_cable_unplug_cb(): def on_virtual_cable_unplug_cb():
@@ -584,7 +590,9 @@ async def main():
async def menu(): async def menu():
reader = await get_stream_reader(sys.stdin) reader = await get_stream_reader(sys.stdin)
while True: while True:
print("\n************************ HID Device Menu *****************************\n") print(
"\n************************ HID Device Menu *****************************\n"
)
print(" 1. Connect Control Channel") print(" 1. Connect Control Channel")
print(" 2. Connect Interrupt Channel") print(" 2. Connect Interrupt Channel")
print(" 3. Disconnect Control Channel") print(" 3. Disconnect Control Channel")
@@ -621,22 +629,26 @@ async def main():
choice1 = choice1.decode('utf-8').strip() choice1 = choice1.decode('utf-8').strip()
if choice1 == '1': if choice1 == '1':
data = bytearray([0x01, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00]) data = bytearray(
hid_device.send_report_on_interrupt(data) [0x01, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00]
data = bytearray([0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) )
hid_device.send_report_on_interrupt(data) hid_device.send_data(data)
data = bytearray(
[0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]
)
hid_device.send_data(data)
elif choice1 == '2': elif choice1 == '2':
data = bytearray([0x02, 0x00, 0x00, 0xf6]) data = bytearray([0x02, 0x00, 0x00, 0xF6])
hid_device.send_report_on_interrupt(data) hid_device.send_data(data)
data = bytearray([0x02, 0x00, 0x00, 0x00]) data = bytearray([0x02, 0x00, 0x00, 0x00])
hid_device.send_report_on_interrupt(data) hid_device.send_data(data)
elif choice1 == '3': elif choice1 == '3':
data = bytearray([0x00, 0x00, 0x00, 0x00]) data = bytearray([0x00, 0x00, 0x00, 0x00])
hid_device.send_report_on_interrupt(data) hid_device.send_data(data)
data = bytearray([0x00, 0x00, 0x00, 0x00]) data = bytearray([0x00, 0x00, 0x00, 0x00])
hid_device.send_report_on_interrupt(data) hid_device.send_data(data)
else: else:
print('Incorrect option selected') print('Incorrect option selected')
@@ -665,7 +677,9 @@ async def main():
elif choice == '9': elif choice == '9':
hid_host_bd_addr = str(hid_device.remote_device_bd_address) hid_host_bd_addr = str(hid_device.remote_device_bd_address)
connection = await device.connect(hid_host_bd_addr, transport=BT_BR_EDR_TRANSPORT) connection = await device.connect(
hid_host_bd_addr, transport=BT_BR_EDR_TRANSPORT
)
await connection.authenticate() await connection.authenticate()
await connection.encrypt() await connection.encrypt()
@@ -691,7 +705,7 @@ async def main():
await keyboard_device(hid_device, 'web') await keyboard_device(hid_device, 'web')
else: else:
#default option is using keyboard.html (web) # default option is using keyboard.html (web)
await keyboard_device(hid_device, 'web') await keyboard_device(hid_device, 'web')
await hci_source.wait_for_termination() await hci_source.wait_for_termination()

View File

@@ -308,8 +308,8 @@ async def main():
if (report_length <= 1) or (report_id == 0): if (report_length <= 1) or (report_id == 0):
return return
#Parse report over interrupt channel # Parse report over interrupt channel
if (report_type == Message.ReportType.INPUT_REPORT): if report_type == Message.ReportType.INPUT_REPORT:
ReportParser.parse_input_report(pdu[1:]) # type: ignore ReportParser.parse_input_report(pdu[1:]) # type: ignore
async def handle_virtual_cable_unplug(): async def handle_virtual_cable_unplug():
@@ -362,7 +362,6 @@ async def main():
await get_hid_device_sdp_record(connection) await get_hid_device_sdp_record(connection)
async def menu(): async def menu():
reader = await get_stream_reader(sys.stdin) reader = await get_stream_reader(sys.stdin)
while True: while True:
@@ -493,11 +492,11 @@ async def main():
data = bytearray( data = bytearray(
[0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00] [0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]
) )
hid_host.send_report_on_interrupt(data) hid_host.send_data(data)
elif choice1 == '2': elif choice1 == '2':
data = bytearray([0x03, 0x00, 0x0D, 0xFD, 0x00, 0x00]) data = bytearray([0x03, 0x00, 0x0D, 0xFD, 0x00, 0x00])
hid_host.send_report_on_interrupt(data) hid_host.send_data(data)
else: else:
print('Incorrect option selected') print('Incorrect option selected')