forked from auracaster/bumble_mirror
Merge pull request #85 from AlanRosenthal/alan/gatt_server_console2
Add `bumble-console --device-config` support for gatt services
This commit is contained in:
5
.gitignore
vendored
5
.gitignore
vendored
@@ -3,9 +3,6 @@ build/
|
|||||||
dist/
|
dist/
|
||||||
*.egg-info/
|
*.egg-info/
|
||||||
*~
|
*~
|
||||||
bumble/__pycache__
|
|
||||||
docs/mkdocs/site
|
docs/mkdocs/site
|
||||||
tests/__pycache__
|
|
||||||
test-results.xml
|
test-results.xml
|
||||||
bumble/transport/__pycache__
|
__pycache__
|
||||||
bumble/profiles/__pycache__
|
|
||||||
|
|||||||
@@ -58,6 +58,12 @@ def padded_bytes(buffer, size):
|
|||||||
return buffer + bytes(padding_size)
|
return buffer + bytes(padding_size)
|
||||||
|
|
||||||
|
|
||||||
|
def get_dict_key_by_value(dictionary, value):
|
||||||
|
for key, val in dictionary.items():
|
||||||
|
if val == value:
|
||||||
|
return key
|
||||||
|
return None
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
# Exceptions
|
# Exceptions
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
@@ -135,7 +141,7 @@ class UUID:
|
|||||||
else:
|
else:
|
||||||
uuid_str = uuid_str_or_int
|
uuid_str = uuid_str_or_int
|
||||||
if len(uuid_str) != 32 and len(uuid_str) != 8 and len(uuid_str) != 4:
|
if len(uuid_str) != 32 and len(uuid_str) != 8 and len(uuid_str) != 4:
|
||||||
raise ValueError('invalid UUID format')
|
raise ValueError(f"invalid UUID format: {uuid_str}")
|
||||||
self.uuid_bytes = bytes(reversed(bytes.fromhex(uuid_str)))
|
self.uuid_bytes = bytes(reversed(bytes.fromhex(uuid_str)))
|
||||||
self.name = name
|
self.name = name
|
||||||
|
|
||||||
|
|||||||
@@ -540,6 +540,7 @@ class DeviceConfiguration:
|
|||||||
)
|
)
|
||||||
self.irk = bytes(16) # This really must be changed for any level of security
|
self.irk = bytes(16) # This really must be changed for any level of security
|
||||||
self.keystore = None
|
self.keystore = None
|
||||||
|
self.gatt_services = []
|
||||||
|
|
||||||
def load_from_dict(self, config):
|
def load_from_dict(self, config):
|
||||||
# Load simple properties
|
# Load simple properties
|
||||||
@@ -556,6 +557,7 @@ class DeviceConfiguration:
|
|||||||
self.classic_accept_any = config.get('classic_accept_any', self.classic_accept_any)
|
self.classic_accept_any = config.get('classic_accept_any', self.classic_accept_any)
|
||||||
self.connectable = config.get('connectable', self.connectable)
|
self.connectable = config.get('connectable', self.connectable)
|
||||||
self.discoverable = config.get('discoverable', self.discoverable)
|
self.discoverable = config.get('discoverable', self.discoverable)
|
||||||
|
self.gatt_services = config.get('gatt_services', self.gatt_services)
|
||||||
|
|
||||||
# Load or synthesize an IRK
|
# Load or synthesize an IRK
|
||||||
irk = config.get('irk')
|
irk = config.get('irk')
|
||||||
@@ -589,7 +591,7 @@ def with_connection_from_handle(function):
|
|||||||
@functools.wraps(function)
|
@functools.wraps(function)
|
||||||
def wrapper(self, connection_handle, *args, **kwargs):
|
def wrapper(self, connection_handle, *args, **kwargs):
|
||||||
if (connection := self.lookup_connection(connection_handle)) is None:
|
if (connection := self.lookup_connection(connection_handle)) is None:
|
||||||
raise ValueError('no connection for handle')
|
raise ValueError(f"no connection for handle: 0x{connection_handle:04x}")
|
||||||
return function(self, connection, *args, **kwargs)
|
return function(self, connection, *args, **kwargs)
|
||||||
return wrapper
|
return wrapper
|
||||||
|
|
||||||
@@ -726,6 +728,26 @@ class Device(CompositeEventEmitter):
|
|||||||
self.connectable = config.connectable
|
self.connectable = config.connectable
|
||||||
self.classic_accept_any = config.classic_accept_any
|
self.classic_accept_any = config.classic_accept_any
|
||||||
|
|
||||||
|
for service in config.gatt_services:
|
||||||
|
characteristics = []
|
||||||
|
for characteristic in service.get("characteristics", []):
|
||||||
|
descriptors = []
|
||||||
|
for descriptor in characteristic.get("descriptors", []):
|
||||||
|
new_descriptor = Descriptor(
|
||||||
|
descriptor_type=descriptor["descriptor_type"],
|
||||||
|
permissions=descriptor["permission"],
|
||||||
|
)
|
||||||
|
descriptors.append(new_descriptor)
|
||||||
|
new_characteristic = Characteristic(
|
||||||
|
uuid=characteristic["uuid"],
|
||||||
|
properties=characteristic["properties"],
|
||||||
|
permissions=int(characteristic["permissions"], 0),
|
||||||
|
descriptors=descriptors,
|
||||||
|
)
|
||||||
|
characteristics.append(new_characteristic)
|
||||||
|
new_service = Service(uuid=service["uuid"], characteristics=characteristics)
|
||||||
|
self.gatt_server.add_service(new_service)
|
||||||
|
|
||||||
# If a name is passed, override the name from the config
|
# If a name is passed, override the name from the config
|
||||||
if name:
|
if name:
|
||||||
self.name = name
|
self.name = name
|
||||||
|
|||||||
@@ -22,6 +22,7 @@
|
|||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
# Imports
|
# Imports
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
|
from __future__ import annotations
|
||||||
import asyncio
|
import asyncio
|
||||||
import enum
|
import enum
|
||||||
import types
|
import types
|
||||||
@@ -187,7 +188,7 @@ class Service(Attribute):
|
|||||||
See Vol 3, Part G - 3.1 SERVICE DEFINITION
|
See Vol 3, Part G - 3.1 SERVICE DEFINITION
|
||||||
'''
|
'''
|
||||||
|
|
||||||
def __init__(self, uuid, characteristics, primary=True):
|
def __init__(self, uuid, characteristics: list[Characteristic], primary=True):
|
||||||
# Convert the uuid to a UUID object if it isn't already
|
# Convert the uuid to a UUID object if it isn't already
|
||||||
if type(uuid) is str:
|
if type(uuid) is str:
|
||||||
uuid = UUID(uuid)
|
uuid = UUID(uuid)
|
||||||
@@ -256,10 +257,21 @@ class Characteristic(Attribute):
|
|||||||
if properties & p
|
if properties & p
|
||||||
])
|
])
|
||||||
|
|
||||||
def __init__(self, uuid, properties, permissions, value = b'', descriptors = []):
|
@staticmethod
|
||||||
|
def string_to_properties(properties_str: str):
|
||||||
|
return functools.reduce(
|
||||||
|
lambda x, y: x | get_dict_key_by_value(Characteristic.PROPERTY_NAMES, y),
|
||||||
|
properties_str.split(","),
|
||||||
|
0,
|
||||||
|
)
|
||||||
|
|
||||||
|
def __init__(self, uuid, properties, permissions, value = b'', descriptors: list[Descriptor] = []):
|
||||||
super().__init__(uuid, permissions, value)
|
super().__init__(uuid, permissions, value)
|
||||||
self.uuid = self.type
|
self.uuid = self.type
|
||||||
self.properties = properties
|
if type(properties) is str:
|
||||||
|
self.properties = Characteristic.string_to_properties(properties)
|
||||||
|
else:
|
||||||
|
self.properties = properties
|
||||||
self.descriptors = descriptors
|
self.descriptors = descriptors
|
||||||
|
|
||||||
def get_descriptor(self, descriptor_type):
|
def get_descriptor(self, descriptor_type):
|
||||||
|
|||||||
@@ -60,6 +60,9 @@ class Server(EventEmitter):
|
|||||||
self.indication_semaphores = defaultdict(lambda: asyncio.Semaphore(1))
|
self.indication_semaphores = defaultdict(lambda: asyncio.Semaphore(1))
|
||||||
self.pending_confirmations = defaultdict(lambda: None)
|
self.pending_confirmations = defaultdict(lambda: None)
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return "\n".join(map(str, self.attributes))
|
||||||
|
|
||||||
def send_gatt_pdu(self, connection_handle, pdu):
|
def send_gatt_pdu(self, connection_handle, pdu):
|
||||||
self.device.send_l2cap_pdu(connection_handle, ATT_CID, pdu)
|
self.device.send_l2cap_pdu(connection_handle, ATT_CID, pdu)
|
||||||
|
|
||||||
@@ -87,7 +90,7 @@ class Server(EventEmitter):
|
|||||||
# Add this attribute to the list
|
# Add this attribute to the list
|
||||||
self.attributes.append(attribute)
|
self.attributes.append(attribute)
|
||||||
|
|
||||||
def add_service(self, service):
|
def add_service(self, service: Service):
|
||||||
# Add the service attribute to the DB
|
# Add the service attribute to the DB
|
||||||
self.add_attribute(service)
|
self.add_attribute(service)
|
||||||
|
|
||||||
|
|||||||
13
tasks.py
13
tasks.py
@@ -52,8 +52,9 @@ build_tasks.add_task(mkdocs, name="mkdocs")
|
|||||||
test_tasks = Collection()
|
test_tasks = Collection()
|
||||||
ns.add_collection(test_tasks, name="test")
|
ns.add_collection(test_tasks, name="test")
|
||||||
|
|
||||||
@task
|
|
||||||
def test(ctx, filter=None, junit=False, install=False, html=False):
|
@task(incrementable=["verbose"])
|
||||||
|
def test(ctx, filter=None, junit=False, install=False, html=False, verbose=0):
|
||||||
# Install the package before running the tests
|
# Install the package before running the tests
|
||||||
if install:
|
if install:
|
||||||
ctx.run("python -m pip install .[test]")
|
ctx.run("python -m pip install .[test]")
|
||||||
@@ -62,10 +63,12 @@ def test(ctx, filter=None, junit=False, install=False, html=False):
|
|||||||
if junit:
|
if junit:
|
||||||
args += "--junit-xml test-results.xml"
|
args += "--junit-xml test-results.xml"
|
||||||
if filter is not None:
|
if filter is not None:
|
||||||
args += " -k '{}'".format(filter)
|
args += f" -k '{filter}'"
|
||||||
if html:
|
if html:
|
||||||
args += "--html results.html"
|
args += " --html results.html"
|
||||||
ctx.run("python -m pytest {} {}".format(os.path.join(ROOT_DIR, "tests"), args))
|
if verbose > 0:
|
||||||
|
args += f" -{'v' * verbose}"
|
||||||
|
ctx.run(f"python -m pytest {os.path.join(ROOT_DIR, 'tests')} {args}")
|
||||||
|
|
||||||
test_tasks.add_task(test, default=True)
|
test_tasks.add_task(test, default=True)
|
||||||
|
|
||||||
|
|||||||
@@ -15,8 +15,7 @@
|
|||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
# Imports
|
# Imports
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
from bumble.core import AdvertisingData
|
from bumble.core import AdvertisingData, get_dict_key_by_value
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
def test_ad_data():
|
def test_ad_data():
|
||||||
@@ -39,6 +38,16 @@ def test_ad_data():
|
|||||||
assert(ad.get(AdvertisingData.TX_POWER_LEVEL, return_all=True, raw=True) == [bytes([123]), bytes([234])])
|
assert(ad.get(AdvertisingData.TX_POWER_LEVEL, return_all=True, raw=True) == [bytes([123]), bytes([234])])
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
def test_get_dict_key_by_value():
|
||||||
|
dictionary = {
|
||||||
|
"A": 1,
|
||||||
|
"B": 2
|
||||||
|
}
|
||||||
|
assert get_dict_key_by_value(dictionary, 1) == "A"
|
||||||
|
assert get_dict_key_by_value(dictionary, 2) == "B"
|
||||||
|
assert get_dict_key_by_value(dictionary, 3) is None
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
test_ad_data()
|
test_ad_data()
|
||||||
@@ -705,6 +705,55 @@ async def test_mtu_exchange():
|
|||||||
assert d2_connection.att_mtu == 50
|
assert d2_connection.att_mtu == 50
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
def test_char_property_to_string():
|
||||||
|
# single
|
||||||
|
assert Characteristic.property_name(0x01) == "BROADCAST"
|
||||||
|
assert Characteristic.property_name(Characteristic.BROADCAST) == "BROADCAST"
|
||||||
|
|
||||||
|
# double
|
||||||
|
assert Characteristic.properties_as_string(0x03) == "BROADCAST,READ"
|
||||||
|
assert Characteristic.properties_as_string(Characteristic.BROADCAST | Characteristic.READ) == "BROADCAST,READ"
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
def test_char_property_string_to_type():
|
||||||
|
# single
|
||||||
|
assert Characteristic.string_to_properties("BROADCAST") == Characteristic.BROADCAST
|
||||||
|
|
||||||
|
# double
|
||||||
|
assert Characteristic.string_to_properties("BROADCAST,READ") == Characteristic.BROADCAST | Characteristic.READ
|
||||||
|
assert Characteristic.string_to_properties("READ,BROADCAST") == Characteristic.BROADCAST | Characteristic.READ
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_server_string():
|
||||||
|
[_, server] = LinkedDevices().devices[:2]
|
||||||
|
|
||||||
|
characteristic = Characteristic(
|
||||||
|
'FDB159DB-036C-49E3-B3DB-6325AC750806',
|
||||||
|
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
|
||||||
|
Characteristic.READABLE | Characteristic.WRITEABLE,
|
||||||
|
bytes([123])
|
||||||
|
)
|
||||||
|
|
||||||
|
service = Service(
|
||||||
|
'3A657F47-D34F-46B3-B1EC-698E29B6B829',
|
||||||
|
[characteristic]
|
||||||
|
)
|
||||||
|
server.add_service(service)
|
||||||
|
|
||||||
|
assert str(server.gatt_server) == """Service(handle=0x0001, end=0x0005, uuid=UUID-16:1800 (Generic Access))
|
||||||
|
Attribute(handle=0x0002, type=UUID-16:2803 (Characteristic), permissions=1, value=020300002a)
|
||||||
|
Characteristic(handle=0x0003, end=0x0003, uuid=UUID-16:2A00 (Device Name), properties=READ)
|
||||||
|
Attribute(handle=0x0004, type=UUID-16:2803 (Characteristic), permissions=1, value=020500012a)
|
||||||
|
Characteristic(handle=0x0005, end=0x0005, uuid=UUID-16:2A01 (Appearance), properties=READ)
|
||||||
|
Service(handle=0x0006, end=0x0009, uuid=3A657F47-D34F-46B3-B1EC-698E29B6B829)
|
||||||
|
Attribute(handle=0x0007, type=UUID-16:2803 (Characteristic), permissions=1, value=1a0800060875ac2563dbb3e3496c03db59b1fd)
|
||||||
|
Characteristic(handle=0x0008, end=0x0009, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, properties=READ,WRITE,NOTIFY)
|
||||||
|
Descriptor(handle=0x0009, type=UUID-16:2902 (Client Characteristic Configuration), value=0000)"""
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
async def async_main():
|
async def async_main():
|
||||||
await test_read_write()
|
await test_read_write()
|
||||||
|
|||||||
Reference in New Issue
Block a user