Support GATT Service

This commit is contained in:
Josh Wu
2025-01-13 19:29:47 +08:00
parent c1ea0ddd35
commit 1eb9d8d055
6 changed files with 398 additions and 36 deletions

View File

@@ -93,6 +93,7 @@ from bumble import smp
from bumble import sdp
from bumble import l2cap
from bumble import core
from bumble.profiles import gatt_service
if TYPE_CHECKING:
from .transport.common import TransportSource, TransportSink
@@ -1756,6 +1757,8 @@ class DeviceConfiguration:
cis_enabled: bool = False
identity_address_type: Optional[int] = None
io_capability: int = pairing.PairingDelegate.IoCapability.NO_OUTPUT_NO_INPUT
gap_service_enabled: bool = True
gatt_service_enabled: bool = True
def __post_init__(self) -> None:
self.gatt_services: list[Dict[str, Any]] = []
@@ -1938,6 +1941,7 @@ class Device(CompositeEventEmitter):
bis_links = dict[int, BisLink]()
big_syncs = dict[int, BigSync]()
_pending_cis: Dict[int, tuple[int, int]]
gatt_service: gatt_service.GenericAttributeProfileService | None = None
@composite_listener
class Listener:
@@ -2004,7 +2008,6 @@ class Device(CompositeEventEmitter):
address: Optional[hci.Address] = None,
config: Optional[DeviceConfiguration] = None,
host: Optional[Host] = None,
generic_access_service: bool = True,
) -> None:
super().__init__()
@@ -2151,7 +2154,10 @@ class Device(CompositeEventEmitter):
# Register the SDP server with the L2CAP Channel Manager
self.sdp_server.register(self.l2cap_channel_manager)
self.add_default_services(generic_access_service)
self.add_default_services(
add_gap_service=config.gap_service_enabled,
add_gatt_service=config.gatt_service_enabled,
)
self.l2cap_channel_manager.register_fixed_channel(ATT_CID, self.on_gatt_pdu)
# Forward some events
@@ -4515,10 +4521,15 @@ class Device(CompositeEventEmitter):
def add_services(self, services):
self.gatt_server.add_services(services)
def add_default_services(self, generic_access_service=True):
def add_default_services(
self, add_gap_service: bool = True, add_gatt_service: bool = True
) -> None:
# Add a GAP Service if requested
if generic_access_service:
if add_gap_service:
self.gatt_server.add_service(GenericAccessService(self.name))
if add_gatt_service:
self.gatt_service = gatt_service.GenericAttributeProfileService()
self.gatt_server.add_service(self.gatt_service)
async def notify_subscriber(self, connection, attribute, value=None, force=False):
await self.gatt_server.notify_subscriber(connection, attribute, value, force)