diff --git a/.gitignore b/.gitignore index 5cb29bb..be8e77a 100644 --- a/.gitignore +++ b/.gitignore @@ -3,9 +3,6 @@ build/ dist/ *.egg-info/ *~ -bumble/__pycache__ docs/mkdocs/site -tests/__pycache__ test-results.xml -bumble/transport/__pycache__ -bumble/profiles/__pycache__ +__pycache__ diff --git a/bumble/core.py b/bumble/core.py index 302ee6a..e4ae378 100644 --- a/bumble/core.py +++ b/bumble/core.py @@ -58,6 +58,12 @@ def padded_bytes(buffer, size): return buffer + bytes(padding_size) +def get_dict_key_by_value(dictionary, value): + for key, val in dictionary.items(): + if val == value: + return key + return None + # ----------------------------------------------------------------------------- # Exceptions # ----------------------------------------------------------------------------- @@ -135,7 +141,7 @@ class UUID: else: uuid_str = uuid_str_or_int if len(uuid_str) != 32 and len(uuid_str) != 8 and len(uuid_str) != 4: - raise ValueError('invalid UUID format') + raise ValueError(f"invalid UUID format: {uuid_str}") self.uuid_bytes = bytes(reversed(bytes.fromhex(uuid_str))) self.name = name diff --git a/bumble/device.py b/bumble/device.py index 300afd2..c2fbc52 100644 --- a/bumble/device.py +++ b/bumble/device.py @@ -540,6 +540,7 @@ class DeviceConfiguration: ) self.irk = bytes(16) # This really must be changed for any level of security self.keystore = None + self.gatt_services = [] def load_from_dict(self, config): # Load simple properties @@ -556,6 +557,7 @@ class DeviceConfiguration: self.classic_accept_any = config.get('classic_accept_any', self.classic_accept_any) self.connectable = config.get('connectable', self.connectable) self.discoverable = config.get('discoverable', self.discoverable) + self.gatt_services = config.get('gatt_services', self.gatt_services) # Load or synthesize an IRK irk = config.get('irk') @@ -589,7 +591,7 @@ def with_connection_from_handle(function): @functools.wraps(function) def wrapper(self, connection_handle, *args, **kwargs): if (connection := self.lookup_connection(connection_handle)) is None: - raise ValueError('no connection for handle') + raise ValueError(f"no connection for handle: 0x{connection_handle:04x}") return function(self, connection, *args, **kwargs) return wrapper @@ -726,6 +728,26 @@ class Device(CompositeEventEmitter): self.connectable = config.connectable self.classic_accept_any = config.classic_accept_any + for service in config.gatt_services: + characteristics = [] + for characteristic in service.get("characteristics", []): + descriptors = [] + for descriptor in characteristic.get("descriptors", []): + new_descriptor = Descriptor( + descriptor_type=descriptor["descriptor_type"], + permissions=descriptor["permission"], + ) + descriptors.append(new_descriptor) + new_characteristic = Characteristic( + uuid=characteristic["uuid"], + properties=characteristic["properties"], + permissions=int(characteristic["permissions"], 0), + descriptors=descriptors, + ) + characteristics.append(new_characteristic) + new_service = Service(uuid=service["uuid"], characteristics=characteristics) + self.gatt_server.add_service(new_service) + # If a name is passed, override the name from the config if name: self.name = name diff --git a/bumble/gatt.py b/bumble/gatt.py index 889eaa4..9bfe901 100644 --- a/bumble/gatt.py +++ b/bumble/gatt.py @@ -22,6 +22,7 @@ # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- +from __future__ import annotations import asyncio import enum import types @@ -187,7 +188,7 @@ class Service(Attribute): See Vol 3, Part G - 3.1 SERVICE DEFINITION ''' - def __init__(self, uuid, characteristics, primary=True): + def __init__(self, uuid, characteristics: list[Characteristic], primary=True): # Convert the uuid to a UUID object if it isn't already if type(uuid) is str: uuid = UUID(uuid) @@ -256,10 +257,21 @@ class Characteristic(Attribute): if properties & p ]) - def __init__(self, uuid, properties, permissions, value = b'', descriptors = []): + @staticmethod + def string_to_properties(properties_str: str): + return functools.reduce( + lambda x, y: x | get_dict_key_by_value(Characteristic.PROPERTY_NAMES, y), + properties_str.split(","), + 0, + ) + + def __init__(self, uuid, properties, permissions, value = b'', descriptors: list[Descriptor] = []): super().__init__(uuid, permissions, value) self.uuid = self.type - self.properties = properties + if type(properties) is str: + self.properties = Characteristic.string_to_properties(properties) + else: + self.properties = properties self.descriptors = descriptors def get_descriptor(self, descriptor_type): diff --git a/bumble/gatt_server.py b/bumble/gatt_server.py index 5b6fea3..4d2a798 100644 --- a/bumble/gatt_server.py +++ b/bumble/gatt_server.py @@ -60,6 +60,9 @@ class Server(EventEmitter): self.indication_semaphores = defaultdict(lambda: asyncio.Semaphore(1)) self.pending_confirmations = defaultdict(lambda: None) + def __str__(self): + return "\n".join(map(str, self.attributes)) + def send_gatt_pdu(self, connection_handle, pdu): self.device.send_l2cap_pdu(connection_handle, ATT_CID, pdu) @@ -87,7 +90,7 @@ class Server(EventEmitter): # Add this attribute to the list self.attributes.append(attribute) - def add_service(self, service): + def add_service(self, service: Service): # Add the service attribute to the DB self.add_attribute(service) diff --git a/tasks.py b/tasks.py index ddba8cd..c361e94 100644 --- a/tasks.py +++ b/tasks.py @@ -52,8 +52,9 @@ build_tasks.add_task(mkdocs, name="mkdocs") test_tasks = Collection() ns.add_collection(test_tasks, name="test") -@task -def test(ctx, filter=None, junit=False, install=False, html=False): + +@task(incrementable=["verbose"]) +def test(ctx, filter=None, junit=False, install=False, html=False, verbose=0): # Install the package before running the tests if install: ctx.run("python -m pip install .[test]") @@ -62,10 +63,12 @@ def test(ctx, filter=None, junit=False, install=False, html=False): if junit: args += "--junit-xml test-results.xml" if filter is not None: - args += " -k '{}'".format(filter) + args += f" -k '{filter}'" if html: - args += "--html results.html" - ctx.run("python -m pytest {} {}".format(os.path.join(ROOT_DIR, "tests"), args)) + args += " --html results.html" + if verbose > 0: + args += f" -{'v' * verbose}" + ctx.run(f"python -m pytest {os.path.join(ROOT_DIR, 'tests')} {args}") test_tasks.add_task(test, default=True) diff --git a/tests/core_test.py b/tests/core_test.py index fa397db..226d0bf 100644 --- a/tests/core_test.py +++ b/tests/core_test.py @@ -15,8 +15,7 @@ # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- -from bumble.core import AdvertisingData - +from bumble.core import AdvertisingData, get_dict_key_by_value # ----------------------------------------------------------------------------- def test_ad_data(): @@ -39,6 +38,16 @@ def test_ad_data(): assert(ad.get(AdvertisingData.TX_POWER_LEVEL, return_all=True, raw=True) == [bytes([123]), bytes([234])]) +# ----------------------------------------------------------------------------- +def test_get_dict_key_by_value(): + dictionary = { + "A": 1, + "B": 2 + } + assert get_dict_key_by_value(dictionary, 1) == "A" + assert get_dict_key_by_value(dictionary, 2) == "B" + assert get_dict_key_by_value(dictionary, 3) is None + # ----------------------------------------------------------------------------- if __name__ == '__main__': test_ad_data() \ No newline at end of file diff --git a/tests/gatt_test.py b/tests/gatt_test.py index 927aeee..c111e82 100644 --- a/tests/gatt_test.py +++ b/tests/gatt_test.py @@ -705,6 +705,55 @@ async def test_mtu_exchange(): assert d2_connection.att_mtu == 50 +# ----------------------------------------------------------------------------- +def test_char_property_to_string(): + # single + assert Characteristic.property_name(0x01) == "BROADCAST" + assert Characteristic.property_name(Characteristic.BROADCAST) == "BROADCAST" + + # double + assert Characteristic.properties_as_string(0x03) == "BROADCAST,READ" + assert Characteristic.properties_as_string(Characteristic.BROADCAST | Characteristic.READ) == "BROADCAST,READ" + + +# ----------------------------------------------------------------------------- +def test_char_property_string_to_type(): + # single + assert Characteristic.string_to_properties("BROADCAST") == Characteristic.BROADCAST + + # double + assert Characteristic.string_to_properties("BROADCAST,READ") == Characteristic.BROADCAST | Characteristic.READ + assert Characteristic.string_to_properties("READ,BROADCAST") == Characteristic.BROADCAST | Characteristic.READ + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_server_string(): + [_, server] = LinkedDevices().devices[:2] + + characteristic = Characteristic( + 'FDB159DB-036C-49E3-B3DB-6325AC750806', + Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY, + Characteristic.READABLE | Characteristic.WRITEABLE, + bytes([123]) + ) + + service = Service( + '3A657F47-D34F-46B3-B1EC-698E29B6B829', + [characteristic] + ) + server.add_service(service) + + assert str(server.gatt_server) == """Service(handle=0x0001, end=0x0005, uuid=UUID-16:1800 (Generic Access)) +Attribute(handle=0x0002, type=UUID-16:2803 (Characteristic), permissions=1, value=020300002a) +Characteristic(handle=0x0003, end=0x0003, uuid=UUID-16:2A00 (Device Name), properties=READ) +Attribute(handle=0x0004, type=UUID-16:2803 (Characteristic), permissions=1, value=020500012a) +Characteristic(handle=0x0005, end=0x0005, uuid=UUID-16:2A01 (Appearance), properties=READ) +Service(handle=0x0006, end=0x0009, uuid=3A657F47-D34F-46B3-B1EC-698E29B6B829) +Attribute(handle=0x0007, type=UUID-16:2803 (Characteristic), permissions=1, value=1a0800060875ac2563dbb3e3496c03db59b1fd) +Characteristic(handle=0x0008, end=0x0009, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, properties=READ,WRITE,NOTIFY) +Descriptor(handle=0x0009, type=UUID-16:2902 (Client Characteristic Configuration), value=0000)""" + # ----------------------------------------------------------------------------- async def async_main(): await test_read_write()