forked from auracaster/bumble_mirror
Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 4ffc050eed | |||
| 60678419a0 | |||
| 648dcc9305 |
@@ -16,7 +16,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
matrix:
|
||||
python-version: ["3.8", "3.9", "3.10"]
|
||||
python-version: ["3.8", "3.9", "3.10", "3.11"]
|
||||
fail-fast: false
|
||||
|
||||
steps:
|
||||
|
||||
+23
-22
@@ -283,8 +283,7 @@ class IncludedServiceDeclaration(Attribute):
|
||||
f'IncludedServiceDefinition(handle=0x{self.handle:04X}, '
|
||||
f'group_starting_handle=0x{self.service.handle:04X}, '
|
||||
f'group_ending_handle=0x{self.service.end_group_handle:04X}, '
|
||||
f'uuid={self.service.uuid}, '
|
||||
f'{self.service.properties!s})'
|
||||
f'uuid={self.service.uuid})'
|
||||
)
|
||||
|
||||
|
||||
@@ -309,31 +308,33 @@ class Characteristic(Attribute):
|
||||
AUTHENTICATED_SIGNED_WRITES = 0x40
|
||||
EXTENDED_PROPERTIES = 0x80
|
||||
|
||||
@staticmethod
|
||||
def from_string(properties_str: str) -> Characteristic.Properties:
|
||||
property_names: List[str] = []
|
||||
for property in Characteristic.Properties:
|
||||
if property.name is None:
|
||||
raise TypeError()
|
||||
property_names.append(property.name)
|
||||
|
||||
def string_to_property(property_string) -> Characteristic.Properties:
|
||||
for property in zip(Characteristic.Properties, property_names):
|
||||
if property_string == property[1]:
|
||||
return property[0]
|
||||
raise TypeError(f"Unable to convert {property_string} to Property")
|
||||
|
||||
@classmethod
|
||||
def from_string(cls, properties_str: str) -> Characteristic.Properties:
|
||||
try:
|
||||
return functools.reduce(
|
||||
lambda x, y: x | string_to_property(y),
|
||||
properties_str.split(","),
|
||||
lambda x, y: x | cls[y],
|
||||
properties_str.replace("|", ",").split(","),
|
||||
Characteristic.Properties(0),
|
||||
)
|
||||
except TypeError:
|
||||
except (TypeError, KeyError):
|
||||
# The check for `p.name is not None` here is needed because for InFlag
|
||||
# enums, the .name property can be None, when the enum value is 0,
|
||||
# so the type hint for .name is Optional[str].
|
||||
enum_list: List[str] = [p.name for p in cls if p.name is not None]
|
||||
enum_list_str = ",".join(enum_list)
|
||||
raise TypeError(
|
||||
f"Characteristic.Properties::from_string() error:\nExpected a string containing any of the keys, separated by commas: {','.join(property_names)}\nGot: {properties_str}"
|
||||
f"Characteristic.Properties::from_string() error:\nExpected a string containing any of the keys, separated by , or |: {enum_list_str}\nGot: {properties_str}"
|
||||
)
|
||||
|
||||
def __str__(self):
|
||||
# NOTE: we override this method to offer a consistent result between python
|
||||
# versions: the value returned by IntFlag.__str__() changed in version 11.
|
||||
return '|'.join(
|
||||
flag.name
|
||||
for flag in Characteristic.Properties
|
||||
if self.value & flag.value and flag.name is not None
|
||||
)
|
||||
|
||||
# For backwards compatibility these are defined here
|
||||
# For new code, please use Characteristic.Properties.X
|
||||
BROADCAST = Properties.BROADCAST
|
||||
@@ -373,7 +374,7 @@ class Characteristic(Attribute):
|
||||
f'Characteristic(handle=0x{self.handle:04X}, '
|
||||
f'end=0x{self.end_group_handle:04X}, '
|
||||
f'uuid={self.uuid}, '
|
||||
f'{self.properties!s})'
|
||||
f'{self.properties})'
|
||||
)
|
||||
|
||||
|
||||
@@ -401,7 +402,7 @@ class CharacteristicDeclaration(Attribute):
|
||||
f'CharacteristicDeclaration(handle=0x{self.handle:04X}, '
|
||||
f'value_handle=0x{self.value_handle:04X}, '
|
||||
f'uuid={self.characteristic.uuid}, '
|
||||
f'{self.characteristic.properties!s})'
|
||||
f'{self.characteristic.properties})'
|
||||
)
|
||||
|
||||
|
||||
|
||||
+25
-29
@@ -724,12 +724,12 @@ class Channel(EventEmitter):
|
||||
response: Optional[asyncio.Future[bytes]]
|
||||
sink: Optional[Callable[[bytes], Any]]
|
||||
state: int
|
||||
connection: 'Connection'
|
||||
connection: Connection
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
manager: 'ChannelManager',
|
||||
connection: 'Connection',
|
||||
connection: Connection,
|
||||
signaling_cid: int,
|
||||
psm: int,
|
||||
source_cid: int,
|
||||
@@ -1033,7 +1033,7 @@ class LeConnectionOrientedChannel(EventEmitter):
|
||||
disconnection_result: Optional[asyncio.Future[None]]
|
||||
out_sdu: Optional[bytes]
|
||||
state: int
|
||||
connection: 'Connection'
|
||||
connection: Connection
|
||||
|
||||
@staticmethod
|
||||
def state_name(state: int) -> str:
|
||||
@@ -1042,7 +1042,7 @@ class LeConnectionOrientedChannel(EventEmitter):
|
||||
def __init__(
|
||||
self,
|
||||
manager: 'ChannelManager',
|
||||
connection: 'Connection',
|
||||
connection: Connection,
|
||||
le_psm: int,
|
||||
source_cid: int,
|
||||
destination_cid: int,
|
||||
@@ -1471,7 +1471,7 @@ class ChannelManager:
|
||||
):
|
||||
raise ValueError('MPS out of range')
|
||||
|
||||
def next_identifier(self, connection: 'Connection') -> int:
|
||||
def next_identifier(self, connection: Connection) -> int:
|
||||
identifier = (self.identifiers.setdefault(connection.handle, 0) + 1) % 256
|
||||
self.identifiers[connection.handle] = identifier
|
||||
return identifier
|
||||
@@ -1574,7 +1574,7 @@ class ChannelManager:
|
||||
)
|
||||
self.host.send_l2cap_pdu(connection.handle, cid, bytes(pdu))
|
||||
|
||||
def on_pdu(self, connection: 'Connection', cid: int, pdu) -> None:
|
||||
def on_pdu(self, connection: Connection, cid: int, pdu) -> None:
|
||||
if cid in (L2CAP_SIGNALING_CID, L2CAP_LE_SIGNALING_CID):
|
||||
# Parse the L2CAP payload into a Control Frame object
|
||||
control_frame = L2CAP_Control_Frame.from_bytes(pdu)
|
||||
@@ -1596,7 +1596,7 @@ class ChannelManager:
|
||||
channel.on_pdu(pdu)
|
||||
|
||||
def send_control_frame(
|
||||
self, connection: 'Connection', cid: int, control_frame
|
||||
self, connection: Connection, cid: int, control_frame
|
||||
) -> None:
|
||||
logger.debug(
|
||||
f'{color(">>> Sending L2CAP Signaling Control Frame", "blue")} '
|
||||
@@ -1605,9 +1605,7 @@ class ChannelManager:
|
||||
)
|
||||
self.host.send_l2cap_pdu(connection.handle, cid, bytes(control_frame))
|
||||
|
||||
def on_control_frame(
|
||||
self, connection: 'Connection', cid: int, control_frame
|
||||
) -> None:
|
||||
def on_control_frame(self, connection: Connection, cid: int, control_frame) -> None:
|
||||
logger.debug(
|
||||
f'{color("<<< Received L2CAP Signaling Control Frame", "green")} '
|
||||
f'on connection [0x{connection.handle:04X}] (CID={cid}) '
|
||||
@@ -1645,12 +1643,12 @@ class ChannelManager:
|
||||
)
|
||||
|
||||
def on_l2cap_command_reject(
|
||||
self, _connection: 'Connection', _cid: int, packet
|
||||
self, _connection: Connection, _cid: int, packet
|
||||
) -> None:
|
||||
logger.warning(f'{color("!!! Command rejected:", "red")} {packet.reason}')
|
||||
|
||||
def on_l2cap_connection_request(
|
||||
self, connection: 'Connection', cid: int, request
|
||||
self, connection: Connection, cid: int, request
|
||||
) -> None:
|
||||
# Check if there's a server for this PSM
|
||||
server = self.servers.get(request.psm)
|
||||
@@ -1704,7 +1702,7 @@ class ChannelManager:
|
||||
)
|
||||
|
||||
def on_l2cap_connection_response(
|
||||
self, connection: 'Connection', cid: int, response
|
||||
self, connection: Connection, cid: int, response
|
||||
) -> None:
|
||||
if (
|
||||
channel := self.find_channel(connection.handle, response.source_cid)
|
||||
@@ -1721,7 +1719,7 @@ class ChannelManager:
|
||||
channel.on_connection_response(response)
|
||||
|
||||
def on_l2cap_configure_request(
|
||||
self, connection: 'Connection', cid: int, request
|
||||
self, connection: Connection, cid: int, request
|
||||
) -> None:
|
||||
if (
|
||||
channel := self.find_channel(connection.handle, request.destination_cid)
|
||||
@@ -1738,7 +1736,7 @@ class ChannelManager:
|
||||
channel.on_configure_request(request)
|
||||
|
||||
def on_l2cap_configure_response(
|
||||
self, connection: 'Connection', cid: int, response
|
||||
self, connection: Connection, cid: int, response
|
||||
) -> None:
|
||||
if (
|
||||
channel := self.find_channel(connection.handle, response.source_cid)
|
||||
@@ -1755,7 +1753,7 @@ class ChannelManager:
|
||||
channel.on_configure_response(response)
|
||||
|
||||
def on_l2cap_disconnection_request(
|
||||
self, connection: 'Connection', cid: int, request
|
||||
self, connection: Connection, cid: int, request
|
||||
) -> None:
|
||||
if (
|
||||
channel := self.find_channel(connection.handle, request.destination_cid)
|
||||
@@ -1772,7 +1770,7 @@ class ChannelManager:
|
||||
channel.on_disconnection_request(request)
|
||||
|
||||
def on_l2cap_disconnection_response(
|
||||
self, connection: 'Connection', cid: int, response
|
||||
self, connection: Connection, cid: int, response
|
||||
) -> None:
|
||||
if (
|
||||
channel := self.find_channel(connection.handle, response.source_cid)
|
||||
@@ -1788,9 +1786,7 @@ class ChannelManager:
|
||||
|
||||
channel.on_disconnection_response(response)
|
||||
|
||||
def on_l2cap_echo_request(
|
||||
self, connection: 'Connection', cid: int, request
|
||||
) -> None:
|
||||
def on_l2cap_echo_request(self, connection: Connection, cid: int, request) -> None:
|
||||
logger.debug(f'<<< Echo request: data={request.data.hex()}')
|
||||
self.send_control_frame(
|
||||
connection,
|
||||
@@ -1799,13 +1795,13 @@ class ChannelManager:
|
||||
)
|
||||
|
||||
def on_l2cap_echo_response(
|
||||
self, _connection: 'Connection', _cid: int, response
|
||||
self, _connection: Connection, _cid: int, response
|
||||
) -> None:
|
||||
logger.debug(f'<<< Echo response: data={response.data.hex()}')
|
||||
# TODO notify listeners
|
||||
|
||||
def on_l2cap_information_request(
|
||||
self, connection: 'Connection', cid: int, request
|
||||
self, connection: Connection, cid: int, request
|
||||
) -> None:
|
||||
if request.info_type == L2CAP_Information_Request.CONNECTIONLESS_MTU:
|
||||
result = L2CAP_Information_Response.SUCCESS
|
||||
@@ -1831,7 +1827,7 @@ class ChannelManager:
|
||||
)
|
||||
|
||||
def on_l2cap_connection_parameter_update_request(
|
||||
self, connection: 'Connection', cid: int, request
|
||||
self, connection: Connection, cid: int, request
|
||||
):
|
||||
if connection.role == BT_CENTRAL_ROLE:
|
||||
self.send_control_frame(
|
||||
@@ -1864,13 +1860,13 @@ class ChannelManager:
|
||||
)
|
||||
|
||||
def on_l2cap_connection_parameter_update_response(
|
||||
self, connection: 'Connection', cid: int, response
|
||||
self, connection: Connection, cid: int, response
|
||||
) -> None:
|
||||
# TODO: check response
|
||||
pass
|
||||
|
||||
def on_l2cap_le_credit_based_connection_request(
|
||||
self, connection: 'Connection', cid: int, request
|
||||
self, connection: Connection, cid: int, request
|
||||
) -> None:
|
||||
if request.le_psm in self.le_coc_servers:
|
||||
(server, max_credits, mtu, mps) = self.le_coc_servers[request.le_psm]
|
||||
@@ -1974,7 +1970,7 @@ class ChannelManager:
|
||||
)
|
||||
|
||||
def on_l2cap_le_credit_based_connection_response(
|
||||
self, connection: 'Connection', _cid: int, response
|
||||
self, connection: Connection, _cid: int, response
|
||||
) -> None:
|
||||
# Find the pending request by identifier
|
||||
request = self.le_coc_requests.get(response.identifier)
|
||||
@@ -1999,7 +1995,7 @@ class ChannelManager:
|
||||
channel.on_connection_response(response)
|
||||
|
||||
def on_l2cap_le_flow_control_credit(
|
||||
self, connection: 'Connection', _cid: int, credit
|
||||
self, connection: Connection, _cid: int, credit
|
||||
) -> None:
|
||||
channel = self.find_le_coc_channel(connection.handle, credit.cid)
|
||||
if channel is None:
|
||||
@@ -2015,7 +2011,7 @@ class ChannelManager:
|
||||
del connection_channels[channel.source_cid]
|
||||
|
||||
async def open_le_coc(
|
||||
self, connection: 'Connection', psm: int, max_credits: int, mtu: int, mps: int
|
||||
self, connection: Connection, psm: int, max_credits: int, mtu: int, mps: int
|
||||
) -> LeConnectionOrientedChannel:
|
||||
self.check_le_coc_parameters(max_credits, mtu, mps)
|
||||
|
||||
@@ -2057,7 +2053,7 @@ class ChannelManager:
|
||||
|
||||
return channel
|
||||
|
||||
async def connect(self, connection: 'Connection', psm: int) -> Channel:
|
||||
async def connect(self, connection: Connection, psm: int) -> Channel:
|
||||
# NOTE: this implementation hard-codes BR/EDR
|
||||
|
||||
# Find a free CID for a new channel
|
||||
|
||||
+15
-11
@@ -803,14 +803,14 @@ async def test_mtu_exchange():
|
||||
# -----------------------------------------------------------------------------
|
||||
def test_char_property_to_string():
|
||||
# single
|
||||
assert str(Characteristic.Properties(0x01)) == "Properties.BROADCAST"
|
||||
assert str(Characteristic.Properties.BROADCAST) == "Properties.BROADCAST"
|
||||
assert str(Characteristic.Properties(0x01)) == "BROADCAST"
|
||||
assert str(Characteristic.Properties.BROADCAST) == "BROADCAST"
|
||||
|
||||
# double
|
||||
assert str(Characteristic.Properties(0x03)) == "Properties.READ|BROADCAST"
|
||||
assert str(Characteristic.Properties(0x03)) == "BROADCAST|READ"
|
||||
assert (
|
||||
str(Characteristic.Properties.BROADCAST | Characteristic.Properties.READ)
|
||||
== "Properties.READ|BROADCAST"
|
||||
== "BROADCAST|READ"
|
||||
)
|
||||
|
||||
|
||||
@@ -831,6 +831,10 @@ def test_characteristic_property_from_string():
|
||||
Characteristic.Properties.from_string("READ,BROADCAST")
|
||||
== Characteristic.Properties.BROADCAST | Characteristic.Properties.READ
|
||||
)
|
||||
assert (
|
||||
Characteristic.Properties.from_string("BROADCAST|READ")
|
||||
== Characteristic.Properties.BROADCAST | Characteristic.Properties.READ
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -841,7 +845,7 @@ def test_characteristic_property_from_string_assert():
|
||||
assert (
|
||||
str(e_info.value)
|
||||
== """Characteristic.Properties::from_string() error:
|
||||
Expected a string containing any of the keys, separated by commas: BROADCAST,READ,WRITE_WITHOUT_RESPONSE,WRITE,NOTIFY,INDICATE,AUTHENTICATED_SIGNED_WRITES,EXTENDED_PROPERTIES
|
||||
Expected a string containing any of the keys, separated by , or |: BROADCAST,READ,WRITE_WITHOUT_RESPONSE,WRITE,NOTIFY,INDICATE,AUTHENTICATED_SIGNED_WRITES,EXTENDED_PROPERTIES
|
||||
Got: BROADCAST,HELLO"""
|
||||
)
|
||||
|
||||
@@ -866,13 +870,13 @@ async def test_server_string():
|
||||
assert (
|
||||
str(server.gatt_server)
|
||||
== """Service(handle=0x0001, end=0x0005, uuid=UUID-16:1800 (Generic Access))
|
||||
CharacteristicDeclaration(handle=0x0002, value_handle=0x0003, uuid=UUID-16:2A00 (Device Name), Properties.READ)
|
||||
Characteristic(handle=0x0003, end=0x0003, uuid=UUID-16:2A00 (Device Name), Properties.READ)
|
||||
CharacteristicDeclaration(handle=0x0004, value_handle=0x0005, uuid=UUID-16:2A01 (Appearance), Properties.READ)
|
||||
Characteristic(handle=0x0005, end=0x0005, uuid=UUID-16:2A01 (Appearance), Properties.READ)
|
||||
CharacteristicDeclaration(handle=0x0002, value_handle=0x0003, uuid=UUID-16:2A00 (Device Name), READ)
|
||||
Characteristic(handle=0x0003, end=0x0003, uuid=UUID-16:2A00 (Device Name), READ)
|
||||
CharacteristicDeclaration(handle=0x0004, value_handle=0x0005, uuid=UUID-16:2A01 (Appearance), READ)
|
||||
Characteristic(handle=0x0005, end=0x0005, uuid=UUID-16:2A01 (Appearance), READ)
|
||||
Service(handle=0x0006, end=0x0009, uuid=3A657F47-D34F-46B3-B1EC-698E29B6B829)
|
||||
CharacteristicDeclaration(handle=0x0007, value_handle=0x0008, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, Properties.NOTIFY|WRITE|READ)
|
||||
Characteristic(handle=0x0008, end=0x0009, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, Properties.NOTIFY|WRITE|READ)
|
||||
CharacteristicDeclaration(handle=0x0007, value_handle=0x0008, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, READ|WRITE|NOTIFY)
|
||||
Characteristic(handle=0x0008, end=0x0009, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, READ|WRITE|NOTIFY)
|
||||
Descriptor(handle=0x0009, type=UUID-16:2902 (Client Characteristic Configuration), value=0000)"""
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user