forked from auracaster/bumble_mirror
431 lines
16 KiB
Python
431 lines
16 KiB
Python
# Copyright 2025 Google LLC
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# https://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Imports
|
|
# -----------------------------------------------------------------------------
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import dataclasses
|
|
import enum
|
|
import functools
|
|
import random
|
|
import struct
|
|
import sys
|
|
from typing import Any, Union
|
|
|
|
import bumble.logging
|
|
from bumble import core, gatt, gatt_adapters, gatt_client, hci, transport
|
|
from bumble.device import Device, Peer
|
|
|
|
# -----------------------------------------------------------------------------
|
|
SERVICE_UUID = core.UUID("50DB505C-8AC4-4738-8448-3B1D9CC09CC5")
|
|
CHARACTERISTIC_UUID_BASE = "D901B45B-4916-412E-ACCA-0000000000"
|
|
|
|
DEFAULT_CLIENT_ADDRESS = "F0:F1:F2:F3:F4:F5"
|
|
DEFAULT_SERVER_ADDRESS = "F1:F2:F3:F4:F5:F6"
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
@dataclasses.dataclass
|
|
class CustomSerializableClass:
|
|
x: int
|
|
y: int
|
|
|
|
@classmethod
|
|
def from_bytes(cls, data: bytes) -> CustomSerializableClass:
|
|
return cls(*struct.unpack(">II", data))
|
|
|
|
def __bytes__(self) -> bytes:
|
|
return struct.pack(">II", self.x, self.y)
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
@dataclasses.dataclass
|
|
class CustomClass:
|
|
a: int
|
|
b: int
|
|
|
|
@classmethod
|
|
def decode(cls, data: bytes) -> CustomClass:
|
|
return cls(*struct.unpack(">II", data))
|
|
|
|
def encode(self) -> bytes:
|
|
return struct.pack(">II", self.a, self.b)
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
class CustomEnum(enum.IntEnum):
|
|
FOO = 1234
|
|
BAR = 5678
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
async def client(device: Device, address: hci.Address) -> None:
|
|
print(f'=== Connecting to {address}...')
|
|
connection = await device.connect(address)
|
|
print('=== Connected')
|
|
|
|
# Discover all characteristics.
|
|
peer = Peer(connection)
|
|
print("*** Discovering services and characteristics...")
|
|
await peer.discover_all()
|
|
print("*** Discovery complete")
|
|
|
|
service = peer.get_services_by_uuid(SERVICE_UUID)[0]
|
|
characteristics: list[gatt_client.CharacteristicProxy] = []
|
|
for index in range(1, 10):
|
|
characteristics.append(
|
|
service.get_characteristics_by_uuid(
|
|
core.UUID(CHARACTERISTIC_UUID_BASE + f"{index:02X}")
|
|
)[0]
|
|
)
|
|
|
|
# Read all characteristics as raw bytes.
|
|
for characteristic in characteristics:
|
|
value = await characteristic.read_value()
|
|
print(f"### {characteristic} = {value!r} ({value.hex()})")
|
|
|
|
# Subscribe to all characteristics as a raw bytes listener.
|
|
def on_raw_characteristic_update(characteristic, value):
|
|
print(f"^^^ Update[RAW] {characteristic.uuid} value = {value.hex()}")
|
|
|
|
for characteristic in characteristics:
|
|
await characteristic.subscribe(
|
|
functools.partial(on_raw_characteristic_update, characteristic)
|
|
)
|
|
|
|
# Function to subscribe to adapted characteristics
|
|
def on_adapted_characteristic_update(characteristic, value):
|
|
print(
|
|
f"^^^ Update[ADAPTED] {characteristic.uuid} value = {value!r}, "
|
|
f"type={type(value)}"
|
|
)
|
|
|
|
# Static characteristic with a bytes value.
|
|
c1 = characteristics[0]
|
|
c1_value = await c1.read_value()
|
|
print(f"@@@ C1 {c1} value = {c1_value!r} (type={type(c1_value)})")
|
|
await c1.write_value("happy π day".encode("utf-8"))
|
|
await c1.subscribe(functools.partial(on_adapted_characteristic_update, c1))
|
|
|
|
# Static characteristic with a string value.
|
|
c2 = gatt_adapters.UTF8CharacteristicProxyAdapter(characteristics[1])
|
|
c2_value = await c2.read_value()
|
|
print(f"@@@ C2 {c2} value = {c2_value} (type={type(c2_value)})")
|
|
await c2.write_value("happy π day")
|
|
await c2.subscribe(functools.partial(on_adapted_characteristic_update, c2))
|
|
|
|
# Static characteristic with a tuple value.
|
|
c3 = gatt_adapters.PackedCharacteristicProxyAdapter(characteristics[2], ">III")
|
|
c3_value = await c3.read_value()
|
|
print(f"@@@ C3 {c3} value = {c3_value} (type={type(c3_value)})")
|
|
await c3.write_value((2001, 2002, 2003))
|
|
await c3.subscribe(functools.partial(on_adapted_characteristic_update, c3))
|
|
|
|
# Static characteristic with a named tuple value.
|
|
c4 = gatt_adapters.MappedCharacteristicProxyAdapter(
|
|
characteristics[3], ">III", ["f1", "f2", "f3"]
|
|
)
|
|
c4_value = await c4.read_value()
|
|
print(f"@@@ C4 {c4} value = {c4_value} (type={type(c4_value)})")
|
|
await c4.write_value({"f1": 4001, "f2": 4002, "f3": 4003})
|
|
await c4.subscribe(functools.partial(on_adapted_characteristic_update, c4))
|
|
|
|
# Static characteristic with a serializable value.
|
|
c5 = gatt_adapters.SerializableCharacteristicProxyAdapter(
|
|
characteristics[4], CustomSerializableClass
|
|
)
|
|
c5_value = await c5.read_value()
|
|
print(f"@@@ C5 {c5} value = {c5_value} (type={type(c5_value)})")
|
|
await c5.write_value(CustomSerializableClass(56, 57))
|
|
await c5.subscribe(functools.partial(on_adapted_characteristic_update, c5))
|
|
|
|
# Static characteristic with a delegated value.
|
|
c6 = gatt_adapters.DelegatedCharacteristicProxyAdapter(
|
|
characteristics[5], encode=CustomClass.encode, decode=CustomClass.decode
|
|
)
|
|
c6_value = await c6.read_value()
|
|
print(f"@@@ C6 {c6} value = {c6_value} (type={type(c6_value)})")
|
|
await c6.write_value(CustomClass(6, 7))
|
|
await c6.subscribe(functools.partial(on_adapted_characteristic_update, c6))
|
|
|
|
# Dynamic characteristic with a bytes value.
|
|
c7 = characteristics[6]
|
|
c7_value = await c7.read_value()
|
|
print(f"@@@ C7 {c7} value = {c7_value!r} (type={type(c7_value)})")
|
|
await c7.write_value(bytes.fromhex("01020304"))
|
|
await c7.subscribe(functools.partial(on_adapted_characteristic_update, c7))
|
|
|
|
# Dynamic characteristic with a string value.
|
|
c8 = gatt_adapters.UTF8CharacteristicProxyAdapter(characteristics[7])
|
|
c8_value = await c8.read_value()
|
|
print(f"@@@ C8 {c8} value = {c8_value} (type={type(c8_value)})")
|
|
await c8.write_value("howdy")
|
|
await c8.subscribe(functools.partial(on_adapted_characteristic_update, c8))
|
|
|
|
# Static characteristic with an enum value
|
|
c9 = gatt_adapters.EnumCharacteristicProxyAdapter(
|
|
characteristics[8], CustomEnum, 3, 'big'
|
|
)
|
|
c9_value = await c9.read_value()
|
|
print(f"@@@ C9 {c9} value = {c9_value.name} (type={type(c9_value)})")
|
|
await c9.write_value(CustomEnum.BAR)
|
|
await c9.subscribe(functools.partial(on_adapted_characteristic_update, c9))
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
def dynamic_read(selector: str) -> Union[bytes, str]:
|
|
if selector == "bytes":
|
|
print("$$$ Returning random bytes")
|
|
return random.randbytes(7)
|
|
elif selector == "string":
|
|
print("$$$ Returning random string")
|
|
return random.randbytes(7).hex()
|
|
|
|
raise ValueError("invalid selector")
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
def dynamic_write(selector: str, value: Any) -> None:
|
|
print(f"$$$ Received[{selector}]: {value} (type={type(value)})")
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
def on_characteristic_read(characteristic: gatt.Characteristic, value: Any) -> None:
|
|
"""Event listener invoked when a characteristic is read."""
|
|
print(f"<<< READ: {characteristic} -> {value} ({type(value)})")
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
def on_characteristic_write(characteristic: gatt.Characteristic, value: Any) -> None:
|
|
"""Event listener invoked when a characteristic is written."""
|
|
print(f"<<< WRITE: {characteristic} <- {value} ({type(value)})")
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
async def server(device: Device) -> None:
|
|
# Static characteristic with a bytes value.
|
|
c1 = gatt.Characteristic(
|
|
CHARACTERISTIC_UUID_BASE + "01",
|
|
gatt.Characteristic.Properties.READ
|
|
| gatt.Characteristic.Properties.WRITE
|
|
| gatt.Characteristic.Properties.NOTIFY,
|
|
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
|
|
b'hello',
|
|
)
|
|
|
|
# Static characteristic with a string value.
|
|
c2 = gatt_adapters.UTF8CharacteristicAdapter(
|
|
gatt.Characteristic(
|
|
CHARACTERISTIC_UUID_BASE + "02",
|
|
gatt.Characteristic.Properties.READ
|
|
| gatt.Characteristic.Properties.WRITE
|
|
| gatt.Characteristic.Properties.NOTIFY,
|
|
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
|
|
'hello',
|
|
)
|
|
)
|
|
|
|
# Static characteristic with a tuple value.
|
|
c3 = gatt_adapters.PackedCharacteristicAdapter(
|
|
gatt.Characteristic(
|
|
CHARACTERISTIC_UUID_BASE + "03",
|
|
gatt.Characteristic.Properties.READ
|
|
| gatt.Characteristic.Properties.WRITE
|
|
| gatt.Characteristic.Properties.NOTIFY,
|
|
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
|
|
(1007, 1008, 1009),
|
|
),
|
|
">III",
|
|
)
|
|
|
|
# Static characteristic with a named tuple value.
|
|
c4 = gatt_adapters.MappedCharacteristicAdapter(
|
|
gatt.Characteristic(
|
|
CHARACTERISTIC_UUID_BASE + "04",
|
|
gatt.Characteristic.Properties.READ
|
|
| gatt.Characteristic.Properties.WRITE
|
|
| gatt.Characteristic.Properties.NOTIFY,
|
|
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
|
|
{"f1": 3007, "f2": 3008, "f3": 3009},
|
|
),
|
|
">III",
|
|
["f1", "f2", "f3"],
|
|
)
|
|
|
|
# Static characteristic with a serializable value.
|
|
c5 = gatt_adapters.SerializableCharacteristicAdapter(
|
|
gatt.Characteristic(
|
|
CHARACTERISTIC_UUID_BASE + "05",
|
|
gatt.Characteristic.Properties.READ
|
|
| gatt.Characteristic.Properties.WRITE
|
|
| gatt.Characteristic.Properties.NOTIFY,
|
|
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
|
|
CustomSerializableClass(11, 12),
|
|
),
|
|
CustomSerializableClass,
|
|
)
|
|
|
|
# Static characteristic with a delegated value.
|
|
c6 = gatt_adapters.DelegatedCharacteristicAdapter(
|
|
gatt.Characteristic(
|
|
CHARACTERISTIC_UUID_BASE + "06",
|
|
gatt.Characteristic.Properties.READ
|
|
| gatt.Characteristic.Properties.WRITE
|
|
| gatt.Characteristic.Properties.NOTIFY,
|
|
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
|
|
CustomClass(1, 2),
|
|
),
|
|
encode=CustomClass.encode,
|
|
decode=CustomClass.decode,
|
|
)
|
|
|
|
# Dynamic characteristic with a bytes value.
|
|
c7 = gatt.Characteristic(
|
|
CHARACTERISTIC_UUID_BASE + "07",
|
|
gatt.Characteristic.Properties.READ
|
|
| gatt.Characteristic.Properties.WRITE
|
|
| gatt.Characteristic.Properties.NOTIFY,
|
|
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
|
|
gatt.CharacteristicValue(
|
|
read=lambda connection: dynamic_read("bytes"),
|
|
write=lambda connection, value: dynamic_write("bytes", value),
|
|
),
|
|
)
|
|
|
|
# Dynamic characteristic with a string value.
|
|
c8 = gatt_adapters.UTF8CharacteristicAdapter(
|
|
gatt.Characteristic(
|
|
CHARACTERISTIC_UUID_BASE + "08",
|
|
gatt.Characteristic.Properties.READ
|
|
| gatt.Characteristic.Properties.WRITE
|
|
| gatt.Characteristic.Properties.NOTIFY,
|
|
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
|
|
gatt.CharacteristicValue(
|
|
read=lambda connection: dynamic_read("string"),
|
|
write=lambda connection, value: dynamic_write("string", value),
|
|
),
|
|
)
|
|
)
|
|
|
|
# Static characteristic with an enum value
|
|
c9 = gatt_adapters.EnumCharacteristicAdapter(
|
|
gatt.Characteristic(
|
|
CHARACTERISTIC_UUID_BASE + "09",
|
|
gatt.Characteristic.Properties.READ
|
|
| gatt.Characteristic.Properties.WRITE
|
|
| gatt.Characteristic.Properties.NOTIFY,
|
|
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
|
|
CustomEnum.FOO,
|
|
),
|
|
cls=CustomEnum,
|
|
length=3,
|
|
byteorder='big',
|
|
)
|
|
|
|
characteristics: list[gatt.Characteristic] = [
|
|
c1,
|
|
c2,
|
|
c3,
|
|
c4,
|
|
c5,
|
|
c6,
|
|
c7,
|
|
c8,
|
|
c9,
|
|
]
|
|
|
|
# Listen for read and write events.
|
|
for characteristic in characteristics:
|
|
characteristic.on(
|
|
"read",
|
|
lambda _, value, c=characteristic: on_characteristic_read(c, value),
|
|
)
|
|
characteristic.on(
|
|
"write",
|
|
lambda _, value, c=characteristic: on_characteristic_write(c, value),
|
|
)
|
|
|
|
device.add_service(gatt.Service(SERVICE_UUID, characteristics))
|
|
|
|
# Notify every 3 seconds
|
|
i = 0
|
|
while True:
|
|
await asyncio.sleep(3)
|
|
|
|
# Notifying can be done with the characteristic's current value, or
|
|
# by explicitly passing a value to notify with. Both variants are used
|
|
# here: for c1..c4 we set the value and then notify, for c4..c9 we notify
|
|
# with an explicit value.
|
|
c1.value = f'hello c1 {i}'.encode()
|
|
await device.notify_subscribers(c1)
|
|
c2.value = f'hello c2 {i}'
|
|
await device.notify_subscribers(c2)
|
|
c3.value = (1000 + i, 2000 + i, 3000 + i)
|
|
await device.notify_subscribers(c3)
|
|
c4.value = {"f1": 4000 + i, "f2": 5000 + i, "f3": 6000 + i}
|
|
await device.notify_subscribers(c4)
|
|
await device.notify_subscribers(c5, CustomSerializableClass(1000 + i, 2000 + i))
|
|
await device.notify_subscribers(c6, CustomClass(3000 + i, 4000 + i))
|
|
await device.notify_subscribers(c7, bytes([1, 2, 3, i % 256]))
|
|
await device.notify_subscribers(c8, f'hello c8 {i}')
|
|
await device.notify_subscribers(
|
|
c9, CustomEnum.FOO if i % 2 == 0 else CustomEnum.BAR
|
|
)
|
|
|
|
i += 1
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
async def main() -> None:
|
|
if len(sys.argv) < 2:
|
|
print("Usage: run_gatt_with_adapters.py <transport-spec> client|server")
|
|
print("example: run_gatt_with_adapters.py usb:0 F0:F1:F2:F3:F4:F5")
|
|
return
|
|
|
|
async with await transport.open_transport(sys.argv[1]) as hci_transport:
|
|
is_client = sys.argv[2] == "client"
|
|
|
|
# Create a device to manage the host
|
|
device = Device.with_hci(
|
|
"Bumble",
|
|
hci.Address(
|
|
DEFAULT_CLIENT_ADDRESS if is_client else DEFAULT_SERVER_ADDRESS
|
|
),
|
|
hci_transport.source,
|
|
hci_transport.sink,
|
|
)
|
|
|
|
# Get things going
|
|
await device.power_on()
|
|
|
|
if is_client:
|
|
# Connect a client to a peer
|
|
await client(device, hci.Address(DEFAULT_SERVER_ADDRESS))
|
|
else:
|
|
# Advertise so a peer can connect
|
|
await device.start_advertising(auto_restart=True)
|
|
|
|
# Setup a server
|
|
await server(device)
|
|
|
|
await hci_transport.source.wait_for_termination()
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
bumble.logging.setup_basic_logging('DEBUG')
|
|
asyncio.run(main())
|