Create Characteristic.Property

Move all Characteristic properties into its own `enum.IntFlag` class
This commit is contained in:
Alan Rosenthal
2023-03-30 16:14:53 -04:00
parent eba82b9d9a
commit fb49a87494
21 changed files with 202 additions and 145 deletions

View File

@@ -558,11 +558,13 @@ class GattServer:
# Setup the GATT service
self.speed_tx = Characteristic(
SPEED_TX_UUID,
Characteristic.WRITE,
Characteristic.Properties.WRITE,
Characteristic.WRITEABLE,
CharacteristicValue(write=self.on_tx_write),
)
self.speed_rx = Characteristic(SPEED_RX_UUID, Characteristic.NOTIFY, 0)
self.speed_rx = Characteristic(
SPEED_RX_UUID, Characteristic.Properties.NOTIFY, 0
)
speed_service = Service(
SPEED_SERVICE_UUID,

View File

@@ -873,7 +873,7 @@ class ConsoleApp:
return
# use write with response if supported
with_response = characteristic.properties & Characteristic.WRITE
with_response = characteristic.properties & Characteristic.Properties.WRITE
await characteristic.write_value(value, with_response=with_response)
async def do_local_write(self, params):

View File

@@ -230,13 +230,13 @@ class GattlinkNodeBridge(GattlinkL2capEndpoint, Device.Listener):
)
self.tx_characteristic = Characteristic(
GG_GATTLINK_TX_CHARACTERISTIC_UUID,
Characteristic.NOTIFY,
Characteristic.Properties.NOTIFY,
Characteristic.READABLE,
)
self.tx_characteristic.on('subscription', self.on_tx_subscription)
self.psm_characteristic = Characteristic(
GG_GATTLINK_L2CAP_CHANNEL_PSM_CHARACTERISTIC_UUID,
Characteristic.READ | Characteristic.NOTIFY,
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
Characteristic.READABLE,
bytes([psm, 0]),
)

View File

@@ -302,7 +302,8 @@ async def pair(
[
Characteristic(
'552957FB-CF1F-4A31-9535-E78847E1A714',
Characteristic.READ | Characteristic.WRITE,
Characteristic.Properties.READ
| Characteristic.Properties.WRITE,
Characteristic.READABLE | Characteristic.WRITEABLE,
CharacteristicValue(
read=read_with_error, write=write_with_error

View File

@@ -1018,7 +1018,9 @@ class Device(CompositeEventEmitter):
descriptors.append(new_descriptor)
new_characteristic = Characteristic(
uuid=characteristic["uuid"],
properties=characteristic["properties"],
properties=Characteristic.Properties.from_string(
characteristic["properties"]
),
permissions=characteristic["permissions"],
descriptors=descriptors,
)

View File

@@ -41,14 +41,14 @@ class GenericAccessService(Service):
def __init__(self, device_name, appearance=(0, 0)):
device_name_characteristic = Characteristic(
GATT_DEVICE_NAME_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
device_name.encode('utf-8')[:248],
)
appearance_characteristic = Characteristic(
GATT_APPEARANCE_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
struct.pack('<H', (appearance[0] << 6) | appearance[1]),
)

View File

@@ -28,7 +28,7 @@ import enum
import functools
import logging
import struct
from typing import Optional, Sequence, List, Any, Iterable
from typing import Optional, Sequence, List
from .colors import color
from .core import UUID, get_dict_key_by_value
@@ -260,64 +260,67 @@ class Characteristic(Attribute):
'''
uuid: UUID
properties: Characteristic.Properties
# Property flags
BROADCAST = 0x01
READ = 0x02
WRITE_WITHOUT_RESPONSE = 0x04
WRITE = 0x08
NOTIFY = 0x10
INDICATE = 0x20
AUTHENTICATED_SIGNED_WRITES = 0x40
EXTENDED_PROPERTIES = 0x80
class Properties(enum.IntFlag):
"""Property flags"""
PROPERTY_NAMES = {
BROADCAST: 'BROADCAST',
READ: 'READ',
WRITE_WITHOUT_RESPONSE: 'WRITE_WITHOUT_RESPONSE',
WRITE: 'WRITE',
NOTIFY: 'NOTIFY',
INDICATE: 'INDICATE',
AUTHENTICATED_SIGNED_WRITES: 'AUTHENTICATED_SIGNED_WRITES',
EXTENDED_PROPERTIES: 'EXTENDED_PROPERTIES',
}
BROADCAST = 0x01
READ = 0x02
WRITE_WITHOUT_RESPONSE = 0x04
WRITE = 0x08
NOTIFY = 0x10
INDICATE = 0x20
AUTHENTICATED_SIGNED_WRITES = 0x40
EXTENDED_PROPERTIES = 0x80
@staticmethod
def property_name(property_int):
return Characteristic.PROPERTY_NAMES.get(property_int, '')
@staticmethod
def from_string(properties_str: str) -> Characteristic.Properties:
property_names: List[str] = []
for property in Characteristic.Properties:
if property.name is None:
raise TypeError()
property_names.append(property.name)
@staticmethod
def properties_as_string(properties):
return ','.join(
[
Characteristic.property_name(p)
for p in Characteristic.PROPERTY_NAMES
if properties & p
]
)
def string_to_property(property_string) -> Characteristic.Properties:
for property in zip(Characteristic.Properties, property_names):
if property_string == property[1]:
return property[0]
raise TypeError(f"Unable to convert {property_string} to Property")
@staticmethod
def string_to_properties(properties_str: str):
return functools.reduce(
lambda x, y: x | get_dict_key_by_value(Characteristic.PROPERTY_NAMES, y),
properties_str.split(","),
0,
)
try:
return functools.reduce(
lambda x, y: x | string_to_property(y),
properties_str.split(","),
Characteristic.Properties(0),
)
except TypeError:
raise TypeError(
f"Characteristic.Properties::from_string() error:\nExpected a string containing any of the keys, separated by commas: {','.join(property_names)}\nGot: {properties_str}"
)
# For backwards compatibility these are defined here
# For new code, please use Characteristic.Properties.X
BROADCAST = Properties.BROADCAST
READ = Properties.READ
WRITE_WITHOUT_RESPONSE = Properties.WRITE_WITHOUT_RESPONSE
WRITE = Properties.WRITE
NOTIFY = Properties.NOTIFY
INDICATE = Properties.INDICATE
AUTHENTICATED_SIGNED_WRITES = Properties.AUTHENTICATED_SIGNED_WRITES
EXTENDED_PROPERTIES = Properties.EXTENDED_PROPERTIES
def __init__(
self,
uuid,
properties,
properties: Characteristic.Properties,
permissions,
value=b'',
descriptors: Sequence[Descriptor] = (),
):
super().__init__(uuid, permissions, value)
self.uuid = self.type
if isinstance(properties, str):
self.properties = Characteristic.string_to_properties(properties)
else:
self.properties = properties
self.properties = properties
self.descriptors = descriptors
def get_descriptor(self, descriptor_type):
@@ -327,18 +330,15 @@ class Characteristic(Attribute):
return None
def has_properties(self, properties: Iterable[int]):
for prop in properties:
if self.properties & prop == 0:
return False
return True
def has_properties(self, properties: Characteristic.Properties) -> bool:
return self.properties & properties == properties
def __str__(self):
return (
f'Characteristic(handle=0x{self.handle:04X}, '
f'end=0x{self.end_group_handle:04X}, '
f'uuid={self.uuid}, '
f'properties={Characteristic.properties_as_string(self.properties)})'
f'{self.properties!s})'
)
@@ -365,8 +365,8 @@ class CharacteristicDeclaration(Attribute):
return (
f'CharacteristicDeclaration(handle=0x{self.handle:04X}, '
f'value_handle=0x{self.value_handle:04X}, '
f'uuid={self.characteristic.uuid}, properties='
f'{Characteristic.properties_as_string(self.characteristic.properties)})'
f'uuid={self.characteristic.uuid}, '
f'{self.characteristic.properties!s})'
)

View File

@@ -27,7 +27,7 @@ from __future__ import annotations
import asyncio
import logging
import struct
from typing import List, Optional
from typing import List, Optional, Dict, Any, Callable
from pyee import EventEmitter
@@ -138,9 +138,18 @@ class ServiceProxy(AttributeProxy):
class CharacteristicProxy(AttributeProxy):
properties: Characteristic.Properties
descriptors: List[DescriptorProxy]
subscribers: Dict[Any, Callable]
def __init__(self, client, handle, end_group_handle, uuid, properties):
def __init__(
self,
client,
handle,
end_group_handle,
uuid,
properties: Characteristic.Properties,
):
super().__init__(client, handle, end_group_handle, uuid)
self.uuid = uuid
self.properties = properties
@@ -185,7 +194,7 @@ class CharacteristicProxy(AttributeProxy):
return (
f'Characteristic(handle=0x{self.handle:04X}, '
f'uuid={self.uuid}, '
f'properties={Characteristic.properties_as_string(self.properties)})'
f'properties={self.properties!s})'
)
@@ -677,8 +686,8 @@ class Client:
return
if (
characteristic.properties & Characteristic.NOTIFY
and characteristic.properties & Characteristic.INDICATE
characteristic.properties & Characteristic.Properties.NOTIFY
and characteristic.properties & Characteristic.Properties.INDICATE
):
if prefer_notify:
bits = ClientCharacteristicConfigurationBits.NOTIFICATION
@@ -686,10 +695,10 @@ class Client:
else:
bits = ClientCharacteristicConfigurationBits.INDICATION
subscribers = self.indication_subscribers
elif characteristic.properties & Characteristic.NOTIFY:
elif characteristic.properties & Characteristic.Properties.NOTIFY:
bits = ClientCharacteristicConfigurationBits.NOTIFICATION
subscribers = self.notification_subscribers
elif characteristic.properties & Characteristic.INDICATE:
elif characteristic.properties & Characteristic.Properties.INDICATE:
bits = ClientCharacteristicConfigurationBits.INDICATION
subscribers = self.indication_subscribers
else:

View File

@@ -243,7 +243,10 @@ class Server(EventEmitter):
# unless there is one already
if (
characteristic.properties
& (Characteristic.NOTIFY | Characteristic.INDICATE)
& (
Characteristic.Properties.NOTIFY
| Characteristic.Properties.INDICATE
)
and characteristic.get_descriptor(
GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR
)

View File

@@ -103,7 +103,7 @@ class AshaService(TemplateService):
self.read_only_properties_characteristic = Characteristic(
GATT_ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
bytes(
[
@@ -120,19 +120,20 @@ class AshaService(TemplateService):
self.audio_control_point_characteristic = Characteristic(
GATT_ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC,
Characteristic.WRITE | Characteristic.WRITE_WITHOUT_RESPONSE,
Characteristic.Properties.WRITE
| Characteristic.Properties.WRITE_WITHOUT_RESPONSE,
Characteristic.WRITEABLE,
CharacteristicValue(write=on_audio_control_point_write),
)
self.audio_status_characteristic = Characteristic(
GATT_ASHA_AUDIO_STATUS_CHARACTERISTIC,
Characteristic.READ | Characteristic.NOTIFY,
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
Characteristic.READABLE,
bytes([0]),
)
self.volume_characteristic = Characteristic(
GATT_ASHA_VOLUME_CHARACTERISTIC,
Characteristic.WRITE_WITHOUT_RESPONSE,
Characteristic.Properties.WRITE_WITHOUT_RESPONSE,
Characteristic.WRITEABLE,
CharacteristicValue(write=on_volume_write),
)
@@ -151,7 +152,7 @@ class AshaService(TemplateService):
self.psm = self.device.register_l2cap_channel_server(self.psm, on_coc, 8)
self.le_psm_out_characteristic = Characteristic(
GATT_ASHA_LE_PSM_OUT_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
struct.pack('<H', self.psm),
)

View File

@@ -36,7 +36,7 @@ class BatteryService(TemplateService):
self.battery_level_characteristic = PackedCharacteristicAdapter(
Characteristic(
GATT_BATTERY_LEVEL_CHARACTERISTIC,
Characteristic.READ | Characteristic.NOTIFY,
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
Characteristic.READABLE,
CharacteristicValue(read=read_battery_level),
),

View File

@@ -63,7 +63,9 @@ class DeviceInformationService(TemplateService):
# TODO: pnp_id
):
characteristics = [
Characteristic(uuid, Characteristic.READ, Characteristic.READABLE, field)
Characteristic(
uuid, Characteristic.Properties.READ, Characteristic.READABLE, field
)
for (field, uuid) in (
(manufacturer_name, GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC),
(model_number, GATT_MODEL_NUMBER_STRING_CHARACTERISTIC),
@@ -79,7 +81,7 @@ class DeviceInformationService(TemplateService):
characteristics.append(
Characteristic(
GATT_SYSTEM_ID_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
self.pack_system_id(*system_id),
)
@@ -89,7 +91,7 @@ class DeviceInformationService(TemplateService):
characteristics.append(
Characteristic(
GATT_REGULATORY_CERTIFICATION_DATA_LIST_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
ieee_regulatory_certification_data_list,
)

View File

@@ -152,7 +152,7 @@ class HeartRateService(TemplateService):
self.heart_rate_measurement_characteristic = DelegatedCharacteristicAdapter(
Characteristic(
GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC,
Characteristic.NOTIFY,
Characteristic.Properties.NOTIFY,
0,
CharacteristicValue(read=read_heart_rate_measurement),
),
@@ -164,7 +164,7 @@ class HeartRateService(TemplateService):
if body_sensor_location is not None:
self.body_sensor_location_characteristic = Characteristic(
GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
bytes([int(body_sensor_location)]),
)
@@ -182,7 +182,7 @@ class HeartRateService(TemplateService):
self.heart_rate_control_point_characteristic = PackedCharacteristicAdapter(
Characteristic(
GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC,
Characteristic.WRITE,
Characteristic.Properties.WRITE,
Characteristic.WRITEABLE,
CharacteristicValue(write=write_heart_rate_control_point_value),
),

View File

@@ -209,7 +209,7 @@ async def keyboard_host(device, peer_address):
return
for i, characteristic in enumerate(report_characteristics):
print(color('REPORT:', 'yellow'), characteristic)
if characteristic.properties & Characteristic.NOTIFY:
if characteristic.properties & Characteristic.Properties.NOTIFY:
await peer.discover_descriptors(characteristic)
report_reference_descriptor = characteristic.get_descriptor(
GATT_REPORT_REFERENCE_DESCRIPTOR
@@ -241,7 +241,9 @@ async def keyboard_device(device, command):
# Create an 'input report' characteristic to send keyboard reports to the host
input_report_characteristic = Characteristic(
GATT_REPORT_CHARACTERISTIC,
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
Characteristic.Properties.READ
| Characteristic.Properties.WRITE
| Characteristic.Properties.NOTIFY,
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([0, 0, 0, 0, 0, 0, 0, 0]),
[
@@ -256,8 +258,8 @@ async def keyboard_device(device, command):
# Create an 'output report' characteristic to receive keyboard reports from the host
output_report_characteristic = Characteristic(
GATT_REPORT_CHARACTERISTIC,
Characteristic.READ
| Characteristic.WRITE
Characteristic.Properties.READ
| Characteristic.Properties.WRITE
| Characteristic.WRITE_WITHOUT_RESPONSE,
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([0]),
@@ -278,7 +280,7 @@ async def keyboard_device(device, command):
[
Characteristic(
GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
'Bumble',
)
@@ -289,13 +291,13 @@ async def keyboard_device(device, command):
[
Characteristic(
GATT_PROTOCOL_MODE_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
bytes([HID_REPORT_PROTOCOL]),
),
Characteristic(
GATT_HID_INFORMATION_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
# bcdHID=1.1, bCountryCode=0x00,
# Flags=RemoteWake|NormallyConnectable
@@ -309,7 +311,7 @@ async def keyboard_device(device, command):
),
Characteristic(
GATT_REPORT_MAP_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
HID_KEYBOARD_REPORT_MAP,
),
@@ -322,7 +324,7 @@ async def keyboard_device(device, command):
[
Characteristic(
GATT_BATTERY_LEVEL_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
bytes([100]),
)

View File

@@ -101,7 +101,7 @@ async def main():
# Add the ASHA service to the GATT server
read_only_properties_characteristic = Characteristic(
ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
bytes(
[
@@ -127,13 +127,13 @@ async def main():
)
audio_control_point_characteristic = Characteristic(
ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC,
Characteristic.WRITE | Characteristic.WRITE_WITHOUT_RESPONSE,
Characteristic.Properties.WRITE | Characteristic.WRITE_WITHOUT_RESPONSE,
Characteristic.WRITEABLE,
CharacteristicValue(write=on_audio_control_point_write),
)
audio_status_characteristic = Characteristic(
ASHA_AUDIO_STATUS_CHARACTERISTIC,
Characteristic.READ | Characteristic.NOTIFY,
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
Characteristic.READABLE,
bytes([0]),
)
@@ -145,7 +145,7 @@ async def main():
)
le_psm_out_characteristic = Characteristic(
ASHA_LE_PSM_OUT_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
struct.pack('<H', psm),
)

View File

@@ -80,7 +80,7 @@ async def main():
)
manufacturer_name_characteristic = Characteristic(
GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
"Fitbit",
[descriptor],

View File

@@ -70,7 +70,7 @@ async def main():
)
manufacturer_name_characteristic = Characteristic(
GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
"Fitbit",
[descriptor],

View File

@@ -96,7 +96,7 @@ async def main():
)
manufacturer_name_characteristic = Characteristic(
GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
'Fitbit',
[descriptor],
@@ -109,13 +109,13 @@ async def main():
[
Characteristic(
'D901B45B-4916-412E-ACCA-376ECB603B2C',
Characteristic.READ | Characteristic.WRITE,
Characteristic.Properties.READ | Characteristic.Properties.WRITE,
Characteristic.READABLE | Characteristic.WRITEABLE,
CharacteristicValue(read=my_custom_read, write=my_custom_write),
),
Characteristic(
'552957FB-CF1F-4A31-9535-E78847E1A714',
Characteristic.READ | Characteristic.WRITE,
Characteristic.Properties.READ | Characteristic.Properties.WRITE,
Characteristic.READABLE | Characteristic.WRITEABLE,
CharacteristicValue(
read=my_custom_read_with_error, write=my_custom_write_with_error
@@ -123,7 +123,7 @@ async def main():
),
Characteristic(
'486F64C6-4B5F-4B3B-8AFF-EDE134A8446A',
Characteristic.READ | Characteristic.NOTIFY,
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
Characteristic.READABLE,
'hello',
),

View File

@@ -74,19 +74,21 @@ async def main():
# Add a few entries to the device's GATT server
characteristic1 = Characteristic(
'486F64C6-4B5F-4B3B-8AFF-EDE134A8446A',
Characteristic.READ | Characteristic.NOTIFY,
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
Characteristic.READABLE,
bytes([0x40]),
)
characteristic2 = Characteristic(
'8EBDEBAE-0017-418E-8D3B-3A3809492165',
Characteristic.READ | Characteristic.INDICATE,
Characteristic.Properties.READ | Characteristic.Properties.INDICATE,
Characteristic.READABLE,
bytes([0x41]),
)
characteristic3 = Characteristic(
'8EBDEBAE-0017-418E-8D3B-3A3809492165',
Characteristic.READ | Characteristic.NOTIFY | Characteristic.INDICATE,
Characteristic.Properties.READ
| Characteristic.Properties.NOTIFY
| Characteristic.Properties.INDICATE,
Characteristic.READABLE,
bytes([0x42]),
)

View File

@@ -115,7 +115,7 @@ async def test_characteristic_encoding():
c = Foo(
GATT_BATTERY_LEVEL_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
123,
)
@@ -144,7 +144,9 @@ async def test_characteristic_encoding():
characteristic = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
Characteristic.Properties.READ
| Characteristic.Properties.WRITE
| Characteristic.Properties.NOTIFY,
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([123]),
)
@@ -240,7 +242,9 @@ async def test_attribute_getters():
characteristic_uuid = UUID('FDB159DB-036C-49E3-B3DB-6325AC750806')
characteristic = Characteristic(
characteristic_uuid,
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
Characteristic.Properties.READ
| Characteristic.Properties.WRITE
| Characteristic.Properties.NOTIFY,
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([123]),
)
@@ -285,7 +289,7 @@ def test_CharacteristicAdapter():
v = bytes([1, 2, 3])
c = Characteristic(
GATT_BATTERY_LEVEL_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
v,
)
@@ -421,7 +425,7 @@ async def test_read_write():
characteristic1 = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.READ | Characteristic.WRITE,
Characteristic.Properties.READ | Characteristic.Properties.WRITE,
Characteristic.READABLE | Characteristic.WRITEABLE,
)
@@ -438,7 +442,7 @@ async def test_read_write():
characteristic2 = Characteristic(
'66DE9057-C848-4ACA-B993-D675644EBB85',
Characteristic.READ | Characteristic.WRITE,
Characteristic.Properties.READ | Characteristic.Properties.WRITE,
Characteristic.READABLE | Characteristic.WRITEABLE,
CharacteristicValue(
read=on_characteristic2_read, write=on_characteristic2_write
@@ -501,7 +505,7 @@ async def test_read_write2():
v = bytes([0x11, 0x22, 0x33, 0x44])
characteristic1 = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.READ | Characteristic.WRITE,
Characteristic.Properties.READ | Characteristic.Properties.WRITE,
Characteristic.READABLE | Characteristic.WRITEABLE,
value=v,
)
@@ -545,7 +549,7 @@ async def test_subscribe_notify():
characteristic1 = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.READ | Characteristic.NOTIFY,
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
Characteristic.READABLE,
bytes([1, 2, 3]),
)
@@ -561,7 +565,7 @@ async def test_subscribe_notify():
characteristic2 = Characteristic(
'66DE9057-C848-4ACA-B993-D675644EBB85',
Characteristic.READ | Characteristic.INDICATE,
Characteristic.Properties.READ | Characteristic.Properties.INDICATE,
Characteristic.READABLE,
bytes([4, 5, 6]),
)
@@ -577,7 +581,9 @@ async def test_subscribe_notify():
characteristic3 = Characteristic(
'AB5E639C-40C1-4238-B9CB-AF41F8B806E4',
Characteristic.READ | Characteristic.NOTIFY | Characteristic.INDICATE,
Characteristic.Properties.READ
| Characteristic.Properties.NOTIFY
| Characteristic.Properties.INDICATE,
Characteristic.READABLE,
bytes([7, 8, 9]),
)
@@ -797,32 +803,46 @@ async def test_mtu_exchange():
# -----------------------------------------------------------------------------
def test_char_property_to_string():
# single
assert Characteristic.property_name(0x01) == "BROADCAST"
assert Characteristic.property_name(Characteristic.BROADCAST) == "BROADCAST"
assert str(Characteristic.Properties(0x01)) == "Properties.BROADCAST"
assert str(Characteristic.Properties.BROADCAST) == "Properties.BROADCAST"
# double
assert Characteristic.properties_as_string(0x03) == "BROADCAST,READ"
assert str(Characteristic.Properties(0x03)) == "Properties.READ|BROADCAST"
assert (
Characteristic.properties_as_string(
Characteristic.BROADCAST | Characteristic.READ
)
== "BROADCAST,READ"
str(Characteristic.Properties.BROADCAST | Characteristic.Properties.READ)
== "Properties.READ|BROADCAST"
)
# -----------------------------------------------------------------------------
def test_char_property_string_to_type():
def test_characteristic_property_from_string():
# single
assert Characteristic.string_to_properties("BROADCAST") == Characteristic.BROADCAST
assert (
Characteristic.Properties.from_string("BROADCAST")
== Characteristic.Properties.BROADCAST
)
# double
assert (
Characteristic.string_to_properties("BROADCAST,READ")
== Characteristic.BROADCAST | Characteristic.READ
Characteristic.Properties.from_string("BROADCAST,READ")
== Characteristic.Properties.BROADCAST | Characteristic.Properties.READ
)
assert (
Characteristic.string_to_properties("READ,BROADCAST")
== Characteristic.BROADCAST | Characteristic.READ
Characteristic.Properties.from_string("READ,BROADCAST")
== Characteristic.Properties.BROADCAST | Characteristic.Properties.READ
)
# -----------------------------------------------------------------------------
def test_characteristic_property_from_string_assert():
with pytest.raises(TypeError) as e_info:
Characteristic.Properties.from_string("BROADCAST,HELLO")
assert (
str(e_info.value)
== """Characteristic.Properties::from_string() error:
Expected a string containing any of the keys, separated by commas: BROADCAST,READ,WRITE_WITHOUT_RESPONSE,WRITE,NOTIFY,INDICATE,AUTHENTICATED_SIGNED_WRITES,EXTENDED_PROPERTIES
Got: BROADCAST,HELLO"""
)
@@ -833,7 +853,9 @@ async def test_server_string():
characteristic = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
Characteristic.Properties.READ
| Characteristic.Properties.WRITE
| Characteristic.Properties.NOTIFY,
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([123]),
)
@@ -844,13 +866,13 @@ async def test_server_string():
assert (
str(server.gatt_server)
== """Service(handle=0x0001, end=0x0005, uuid=UUID-16:1800 (Generic Access))
CharacteristicDeclaration(handle=0x0002, value_handle=0x0003, uuid=UUID-16:2A00 (Device Name), properties=READ)
Characteristic(handle=0x0003, end=0x0003, uuid=UUID-16:2A00 (Device Name), properties=READ)
CharacteristicDeclaration(handle=0x0004, value_handle=0x0005, uuid=UUID-16:2A01 (Appearance), properties=READ)
Characteristic(handle=0x0005, end=0x0005, uuid=UUID-16:2A01 (Appearance), properties=READ)
CharacteristicDeclaration(handle=0x0002, value_handle=0x0003, uuid=UUID-16:2A00 (Device Name), Properties.READ)
Characteristic(handle=0x0003, end=0x0003, uuid=UUID-16:2A00 (Device Name), Properties.READ)
CharacteristicDeclaration(handle=0x0004, value_handle=0x0005, uuid=UUID-16:2A01 (Appearance), Properties.READ)
Characteristic(handle=0x0005, end=0x0005, uuid=UUID-16:2A01 (Appearance), Properties.READ)
Service(handle=0x0006, end=0x0009, uuid=3A657F47-D34F-46B3-B1EC-698E29B6B829)
CharacteristicDeclaration(handle=0x0007, value_handle=0x0008, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, properties=READ,WRITE,NOTIFY)
Characteristic(handle=0x0008, end=0x0009, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, properties=READ,WRITE,NOTIFY)
CharacteristicDeclaration(handle=0x0007, value_handle=0x0008, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, Properties.NOTIFY|WRITE|READ)
Characteristic(handle=0x0008, end=0x0009, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, Properties.NOTIFY|WRITE|READ)
Descriptor(handle=0x0009, type=UUID-16:2902 (Client Characteristic Configuration), value=0000)"""
)
@@ -875,7 +897,9 @@ def test_attribute_string_to_permissions():
def test_characteristic_permissions():
characteristic = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
Characteristic.Properties.READ
| Characteristic.Properties.WRITE
| Characteristic.Properties.NOTIFY,
'READABLE,WRITEABLE',
)
assert characteristic.permissions == 3
@@ -885,15 +909,21 @@ def test_characteristic_permissions():
def test_characteristic_has_properties():
characteristic = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
Characteristic.Properties.READ
| Characteristic.Properties.WRITE
| Characteristic.Properties.NOTIFY,
'READABLE,WRITEABLE',
)
assert characteristic.has_properties([Characteristic.READ])
assert characteristic.has_properties([Characteristic.READ, Characteristic.WRITE])
assert not characteristic.has_properties(
[Characteristic.READ, Characteristic.WRITE, Characteristic.INDICATE]
assert characteristic.has_properties(Characteristic.Properties.READ)
assert characteristic.has_properties(
Characteristic.Properties.READ | Characteristic.Properties.WRITE
)
assert not characteristic.has_properties([Characteristic.INDICATE])
assert not characteristic.has_properties(
Characteristic.Properties.READ
| Characteristic.Properties.WRITE
| Characteristic.Properties.INDICATE
)
assert not characteristic.has_properties(Characteristic.Properties.INDICATE)
# -----------------------------------------------------------------------------

View File

@@ -163,25 +163,28 @@ async def test_self_gatt():
# Add some GATT characteristics to device 1
c1 = Characteristic(
'3A143AD7-D4A7-436B-97D6-5B62C315E833',
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
bytes([1, 2, 3]),
)
c2 = Characteristic(
'9557CCE2-DB37-46EB-94C4-50AE5B9CB0F8',
Characteristic.READ | Characteristic.WRITE,
Characteristic.Properties.READ | Characteristic.Properties.WRITE,
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([4, 5, 6]),
)
c3 = Characteristic(
'84FC1A2E-C52D-4A2D-B8C3-8855BAB86638',
Characteristic.READ | Characteristic.WRITE_WITHOUT_RESPONSE,
Characteristic.Properties.READ
| Characteristic.Properties.WRITE_WITHOUT_RESPONSE,
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([7, 8, 9]),
)
c4 = Characteristic(
'84FC1A2E-C52D-4A2D-B8C3-8855BAB86638',
Characteristic.READ | Characteristic.NOTIFY | Characteristic.INDICATE,
Characteristic.Properties.READ
| Characteristic.Properties.NOTIFY
| Characteristic.Properties.INDICATE,
Characteristic.READABLE,
bytes([1, 1, 1]),
)
@@ -234,7 +237,7 @@ async def test_self_gatt_long_read():
characteristics = [
Characteristic(
f'3A143AD7-D4A7-436B-97D6-5B62C315{i:04X}',
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
bytes([x & 255 for x in range(i)]),
)