Files
bumble_mirror/bumble/gatt_server.py
zxzxwu f1058e4d4e Merge pull request #859 from istemon/att-read-by-type-request-fix
Return 'invalid handle' for malformed read by type request
2026-01-16 15:09:20 +08:00

1171 lines
43 KiB
Python

# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# GATT - Generic att.Attribute Profile
# Server
#
# See Bluetooth spec @ Vol 3, Part G
#
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import logging
import struct
from collections import defaultdict
from collections.abc import Iterable
from typing import TYPE_CHECKING, TypeVar
from bumble import att, core, l2cap, utils
from bumble.colors import color
from bumble.gatt import (
GATT_CHARACTERISTIC_ATTRIBUTE_TYPE,
GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR,
GATT_MAX_ATTRIBUTE_VALUE_SIZE,
GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE,
GATT_REQUEST_TIMEOUT,
GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE,
Characteristic,
CharacteristicDeclaration,
Descriptor,
IncludedServiceDeclaration,
Service,
)
if TYPE_CHECKING:
from bumble.device import Device
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
GATT_SERVER_DEFAULT_MAX_MTU = 517
# -----------------------------------------------------------------------------
# Helpers
# -----------------------------------------------------------------------------
def _bearer_id(bearer: att.Bearer) -> str:
if att.is_enhanced_bearer(bearer):
return f'[0x{bearer.connection.handle:04X}|CID=0x{bearer.source_cid:04X}]'
else:
return f'[0x{bearer.handle:04X}]'
# -----------------------------------------------------------------------------
# GATT Server
# -----------------------------------------------------------------------------
class Server(utils.EventEmitter):
attributes: list[att.Attribute]
services: list[Service]
attributes_by_handle: dict[int, att.Attribute]
subscribers: dict[att.Bearer, dict[int, bytes]]
indication_semaphores: defaultdict[att.Bearer, asyncio.Semaphore]
pending_confirmations: defaultdict[att.Bearer, asyncio.futures.Future | None]
EVENT_CHARACTERISTIC_SUBSCRIPTION = "characteristic_subscription"
def __init__(self, device: Device) -> None:
super().__init__()
self.device = device
self.services = []
self.attributes = [] # att.Attributes, ordered by increasing handle values
self.attributes_by_handle = {} # Map for fast attribute access by handle
self.max_mtu = (
GATT_SERVER_DEFAULT_MAX_MTU # The max MTU we're willing to negotiate
)
self.subscribers = (
{}
) # Map of subscriber states by connection handle and attribute handle
self.indication_semaphores = defaultdict(lambda: asyncio.Semaphore(1))
self.pending_confirmations = defaultdict(lambda: None)
def __str__(self) -> str:
return "\n".join(map(str, self.attributes))
def register_eatt(
self, spec: l2cap.LeCreditBasedChannelSpec | None = None
) -> l2cap.LeCreditBasedChannelServer:
def on_channel(channel: l2cap.LeCreditBasedChannel):
logger.debug(
"New EATT Bearer Connection=0x%04X CID=0x%04X",
channel.connection.handle,
channel.source_cid,
)
channel.sink = lambda pdu: self.on_gatt_pdu(
channel, att.ATT_PDU.from_bytes(pdu)
)
return self.device.create_l2cap_server(
spec or l2cap.LeCreditBasedChannelSpec(psm=att.EATT_PSM), handler=on_channel
)
def send_gatt_pdu(self, bearer: att.Bearer, pdu: bytes) -> None:
if att.is_enhanced_bearer(bearer):
bearer.write(pdu)
else:
self.device.send_l2cap_pdu(bearer.handle, att.ATT_CID, pdu)
def next_handle(self) -> int:
return 1 + len(self.attributes)
def get_advertising_service_data(self) -> dict[att.Attribute, bytes]:
return {
attribute: data
for attribute in self.attributes
if isinstance(attribute, Service)
and (data := attribute.get_advertising_data())
}
def get_attribute(self, handle: int) -> att.Attribute | None:
attribute = self.attributes_by_handle.get(handle)
if attribute:
return attribute
# Not in the cached map, perform a linear lookup
for attribute in self.attributes:
if attribute.handle == handle:
# Store in cached map
self.attributes_by_handle[handle] = attribute
return attribute
return None
AttributeGroupType = TypeVar('AttributeGroupType', Service, Characteristic)
def get_attribute_group(
self, handle: int, group_type: type[AttributeGroupType]
) -> AttributeGroupType | None:
return next(
(
attribute
for attribute in self.attributes
if isinstance(attribute, group_type)
and attribute.handle <= handle <= attribute.end_group_handle
),
None,
)
def get_service_attribute(self, service_uuid: core.UUID) -> Service | None:
return next(
(
attribute
for attribute in self.attributes
if attribute.type == GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE
and isinstance(attribute, Service)
and attribute.uuid == service_uuid
),
None,
)
def get_characteristic_attributes(
self, service_uuid: core.UUID, characteristic_uuid: core.UUID
) -> tuple[CharacteristicDeclaration, Characteristic] | None:
service_handle = self.get_service_attribute(service_uuid)
if not service_handle:
return None
return next(
(
(
attribute,
self.get_attribute(attribute.characteristic.handle),
) # type: ignore
for attribute in map(
self.get_attribute,
range(service_handle.handle, service_handle.end_group_handle + 1),
)
if attribute is not None
and attribute.type == GATT_CHARACTERISTIC_ATTRIBUTE_TYPE
and isinstance(attribute, CharacteristicDeclaration)
and attribute.characteristic.uuid == characteristic_uuid
),
None,
)
def get_descriptor_attribute(
self,
service_uuid: core.UUID,
characteristic_uuid: core.UUID,
descriptor_uuid: core.UUID,
) -> Descriptor | None:
characteristics = self.get_characteristic_attributes(
service_uuid, characteristic_uuid
)
if not characteristics:
return None
(_, characteristic_value) = characteristics
return next(
(
attribute # type: ignore
for attribute in map(
self.get_attribute,
range(
characteristic_value.handle + 1,
characteristic_value.end_group_handle + 1,
),
)
if attribute is not None and attribute.type == descriptor_uuid
),
None,
)
def add_attribute(self, attribute: att.Attribute) -> None:
# Assign a handle to this attribute
attribute.handle = self.next_handle()
attribute.end_group_handle = (
attribute.handle
) # TODO: keep track of descriptors in the group
# Add this attribute to the list
self.attributes.append(attribute)
def add_service(self, service: Service) -> None:
# Add the service attribute to the DB
self.add_attribute(service)
# Add all included service
for included_service in service.included_services:
# Not registered yet, register the included service first.
if included_service not in self.services:
self.add_service(included_service)
# TODO: Handle circular service reference
include_declaration = IncludedServiceDeclaration(included_service)
self.add_attribute(include_declaration)
# Add all characteristics
for characteristic in service.characteristics:
# Add a Characteristic Declaration
characteristic_declaration = CharacteristicDeclaration(
characteristic, self.next_handle() + 1
)
self.add_attribute(characteristic_declaration)
# Add the characteristic value
self.add_attribute(characteristic)
# Add the descriptors
for descriptor in characteristic.descriptors:
self.add_attribute(descriptor)
# If the characteristic supports subscriptions, add a CCCD descriptor
# unless there is one already
if (
characteristic.properties
& (
Characteristic.Properties.NOTIFY
| Characteristic.Properties.INDICATE
)
and characteristic.get_descriptor(
GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR
)
is None
):
self.add_attribute(
# pylint: disable=line-too-long
Descriptor(
GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR,
att.Attribute.READABLE | att.Attribute.WRITEABLE,
self.make_descriptor_value(characteristic),
)
)
# Update the service and characteristic group ends
characteristic_declaration.end_group_handle = self.attributes[-1].handle
characteristic.end_group_handle = self.attributes[-1].handle
# Update the service group end
service.end_group_handle = self.attributes[-1].handle
self.services.append(service)
def add_services(self, services: Iterable[Service]) -> None:
for service in services:
self.add_service(service)
def make_descriptor_value(
self, characteristic: Characteristic
) -> att.AttributeValueV2:
# It is necessary to use Attribute Value V2 here to identify the bearer of CCCD.
return att.AttributeValueV2(
lambda bearer, characteristic=characteristic: self.read_cccd(
bearer, characteristic
),
write=lambda bearer, value, characteristic=characteristic: self.write_cccd(
bearer, characteristic, value
),
)
def read_cccd(self, bearer: att.Bearer, characteristic: Characteristic) -> bytes:
subscribers = self.subscribers.get(bearer)
cccd = None
if subscribers:
cccd = subscribers.get(characteristic.handle)
return cccd or bytes([0, 0])
def write_cccd(
self,
bearer: att.Bearer,
characteristic: Characteristic,
value: bytes,
) -> None:
logger.debug(
f'Subscription update for connection={_bearer_id(bearer)}, '
f'handle=0x{characteristic.handle:04X}: {value.hex()}'
)
# Check parameters
if len(value) != 2:
logger.warning('CCCD value not 2 bytes long')
return
cccds = self.subscribers.setdefault(bearer, {})
cccds[characteristic.handle] = value
logger.debug(f'CCCDs: {cccds}')
notify_enabled = value[0] & 0x01 != 0
indicate_enabled = value[0] & 0x02 != 0
characteristic.emit(
characteristic.EVENT_SUBSCRIPTION,
bearer,
notify_enabled,
indicate_enabled,
)
self.emit(
self.EVENT_CHARACTERISTIC_SUBSCRIPTION,
bearer,
characteristic,
notify_enabled,
indicate_enabled,
)
def send_response(self, bearer: att.Bearer, response: att.ATT_PDU) -> None:
logger.debug(f'GATT Response from server: {_bearer_id(bearer)} {response}')
self.send_gatt_pdu(bearer, bytes(response))
async def notify_subscriber(
self,
bearer: att.Bearer,
attribute: att.Attribute,
value: bytes | None = None,
force: bool = False,
) -> None:
if att.is_enhanced_bearer(bearer) or force:
return await self._notify_single_subscriber(bearer, attribute, value, force)
else:
# If API is called to a Connection and not forced, try to notify all subscribed bearers on it.
bearers = [
channel
for channel in self.device.l2cap_channel_manager.le_coc_channels.get(
bearer.handle, {}
).values()
if channel.psm == att.EATT_PSM
] + [bearer]
for bearer in bearers:
await self._notify_single_subscriber(bearer, attribute, value, force)
async def _notify_single_subscriber(
self,
bearer: att.Bearer,
attribute: att.Attribute,
value: bytes | None,
force: bool,
) -> None:
# Check if there's a subscriber
if not force:
subscribers = self.subscribers.get(bearer)
if not subscribers:
logger.debug('not notifying, no subscribers')
return
cccd = subscribers.get(attribute.handle)
if not cccd:
logger.debug(
f'not notifying, no subscribers for handle {attribute.handle:04X}'
)
return
if len(cccd) != 2 or (cccd[0] & 0x01 == 0):
logger.debug(f'not notifying, cccd={cccd.hex()}')
return
# Get or encode the value
value = (
await attribute.read_value(bearer)
if value is None
else attribute.encode_value(value)
)
# Truncate if needed
if len(value) > bearer.att_mtu - 3:
value = value[: bearer.att_mtu - 3]
# Notify
notification = att.ATT_Handle_Value_Notification(
attribute_handle=attribute.handle, attribute_value=value
)
logger.debug(f'GATT Notify from server: {_bearer_id(bearer)} {notification}')
self.send_gatt_pdu(bearer, bytes(notification))
async def indicate_subscriber(
self,
bearer: att.Bearer,
attribute: att.Attribute,
value: bytes | None = None,
force: bool = False,
) -> None:
if att.is_enhanced_bearer(bearer) or force:
return await self._notify_single_subscriber(bearer, attribute, value, force)
else:
# If API is called to a Connection and not forced, try to indicate all subscribed bearers on it.
bearers = [
channel
for channel in self.device.l2cap_channel_manager.le_coc_channels.get(
bearer.handle, {}
).values()
if channel.psm == att.EATT_PSM
] + [bearer]
for bearer in bearers:
await self._indicate_single_bearer(bearer, attribute, value, force)
async def _indicate_single_bearer(
self,
bearer: att.Bearer,
attribute: att.Attribute,
value: bytes | None,
force: bool,
) -> None:
# Check if there's a subscriber
if not force:
subscribers = self.subscribers.get(bearer)
if not subscribers:
logger.debug('not indicating, no subscribers')
return
cccd = subscribers.get(attribute.handle)
if not cccd:
logger.debug(
f'not indicating, no subscribers for handle {attribute.handle:04X}'
)
return
if len(cccd) != 2 or (cccd[0] & 0x02 == 0):
logger.debug(f'not indicating, cccd={cccd.hex()}')
return
# Get or encode the value
value = (
await attribute.read_value(bearer)
if value is None
else attribute.encode_value(value)
)
# Truncate if needed
if len(value) > bearer.att_mtu - 3:
value = value[: bearer.att_mtu - 3]
# Indicate
indication = att.ATT_Handle_Value_Indication(
attribute_handle=attribute.handle, attribute_value=value
)
logger.debug(f'GATT Indicate from server: {_bearer_id(bearer)} {indication}')
# Wait until we can send (only one pending indication at a time per connection)
async with self.indication_semaphores[bearer]:
assert self.pending_confirmations[bearer] is None
# Create a future value to hold the eventual response
pending_confirmation = self.pending_confirmations[bearer] = (
asyncio.get_running_loop().create_future()
)
try:
self.send_gatt_pdu(bearer, bytes(indication))
await asyncio.wait_for(pending_confirmation, GATT_REQUEST_TIMEOUT)
except asyncio.TimeoutError as error:
logger.warning(color('!!! GATT Indicate timeout', 'red'))
raise TimeoutError(f'GATT timeout for {indication.name}') from error
finally:
self.pending_confirmations[bearer] = None
async def _notify_or_indicate_subscribers(
self,
indicate: bool,
attribute: att.Attribute,
value: bytes | None = None,
force: bool = False,
) -> None:
# Get all the bearers for which there's at least one subscription
bearers: list[att.Bearer] = [
bearer
for bearer, subscribers in self.subscribers.items()
if force or subscribers.get(attribute.handle)
]
# Indicate or notify for each connection
if bearers:
coroutine = (
self._indicate_single_bearer
if indicate
else self._notify_single_subscriber
)
await asyncio.wait(
[
asyncio.create_task(coroutine(bearer, attribute, value, force))
for bearer in bearers
]
)
async def notify_subscribers(
self,
attribute: att.Attribute,
value: bytes | None = None,
force: bool = False,
):
return await self._notify_or_indicate_subscribers(
False, attribute, value, force
)
async def indicate_subscribers(
self,
attribute: att.Attribute,
value: bytes | None = None,
force: bool = False,
):
return await self._notify_or_indicate_subscribers(True, attribute, value, force)
def on_disconnection(self, bearer: att.Bearer) -> None:
self.subscribers.pop(bearer, None)
self.indication_semaphores.pop(bearer, None)
self.pending_confirmations.pop(bearer, None)
def on_gatt_pdu(self, bearer: att.Bearer, att_pdu: att.ATT_PDU) -> None:
logger.debug(f'GATT Request to server: {_bearer_id(bearer)} {att_pdu}')
handler_name = f'on_{att_pdu.name.lower()}'
handler = getattr(self, handler_name, None)
if handler is not None:
try:
handler(bearer, att_pdu)
except att.ATT_Error as error:
logger.debug(f'normal exception returned by handler: {error}')
response = att.ATT_Error_Response(
request_opcode_in_error=att_pdu.op_code,
attribute_handle_in_error=error.att_handle,
error_code=error.error_code,
)
self.send_response(bearer, response)
except Exception:
logger.exception(color("!!! Exception in handler:", "red"))
response = att.ATT_Error_Response(
request_opcode_in_error=att_pdu.op_code,
attribute_handle_in_error=0x0000,
error_code=att.ATT_UNLIKELY_ERROR_ERROR,
)
self.send_response(bearer, response)
raise
else:
# No specific handler registered
if att_pdu.op_code in att.ATT_REQUESTS:
# Invoke the generic handler
self.on_att_request(bearer, att_pdu)
else:
# Just ignore
logger.warning(
color(
f'--- Ignoring GATT Request from {_bearer_id(bearer)}: ',
'red',
)
+ str(att_pdu)
)
#######################################################
# ATT handlers
#######################################################
def on_att_request(self, bearer: att.Bearer, pdu: att.ATT_PDU) -> None:
'''
Handler for requests without a more specific handler
'''
logger.warning(
color(
f'--- Unsupported ATT Request from {_bearer_id(bearer)}: ',
'red',
)
+ str(pdu)
)
response = att.ATT_Error_Response(
request_opcode_in_error=pdu.op_code,
attribute_handle_in_error=0x0000,
error_code=att.ATT_REQUEST_NOT_SUPPORTED_ERROR,
)
self.send_response(bearer, response)
def on_att_exchange_mtu_request(
self, bearer: att.Bearer, request: att.ATT_Exchange_MTU_Request
):
'''
See Bluetooth spec Vol 3, Part F - 3.4.2.1 Exchange MTU Request
'''
self.send_response(
bearer, att.ATT_Exchange_MTU_Response(server_rx_mtu=self.max_mtu)
)
# Compute the final MTU
if request.client_rx_mtu >= att.ATT_DEFAULT_MTU:
mtu = min(self.max_mtu, request.client_rx_mtu)
bearer.on_att_mtu_update(mtu)
else:
logger.warning('invalid client_rx_mtu received, MTU not changed')
def on_att_find_information_request(
self, bearer: att.Bearer, request: att.ATT_Find_Information_Request
):
'''
See Bluetooth spec Vol 3, Part F - 3.4.3.1 Find Information Request
'''
response: att.ATT_PDU
# Check the request parameters
if (
request.starting_handle == 0
or request.starting_handle > request.ending_handle
):
self.send_response(
bearer,
att.ATT_Error_Response(
request_opcode_in_error=request.op_code,
attribute_handle_in_error=request.starting_handle,
error_code=att.ATT_INVALID_HANDLE_ERROR,
),
)
return
# Build list of returned attributes
pdu_space_available = bearer.att_mtu - 2
attributes: list[att.Attribute] = []
uuid_size = 0
for attribute in (
attribute
for attribute in self.attributes
if attribute.handle >= request.starting_handle
and attribute.handle <= request.ending_handle
):
this_uuid_size = len(attribute.type.to_pdu_bytes())
if attributes:
# Check if this attribute has the same type size as the previous one
if this_uuid_size != uuid_size:
break
# Check if there's enough space for one more entry
uuid_size = this_uuid_size
if pdu_space_available < 2 + uuid_size:
break
# Add the attribute to the list
attributes.append(attribute)
pdu_space_available -= 2 + uuid_size
# Return the list of attributes
if attributes:
information_data_list = [
struct.pack('<H', attribute.handle) + attribute.type.to_pdu_bytes()
for attribute in attributes
]
response = att.ATT_Find_Information_Response(
format=1 if len(attributes[0].type.to_pdu_bytes()) == 2 else 2,
information_data=b''.join(information_data_list),
)
else:
response = att.ATT_Error_Response(
request_opcode_in_error=request.op_code,
attribute_handle_in_error=request.starting_handle,
error_code=att.ATT_ATTRIBUTE_NOT_FOUND_ERROR,
)
self.send_response(bearer, response)
@utils.AsyncRunner.run_in_task()
async def on_att_find_by_type_value_request(
self, bearer: att.Bearer, request: att.ATT_Find_By_Type_Value_Request
):
'''
See Bluetooth spec Vol 3, Part F - 3.4.3.3 Find By Type Value Request
'''
# Build list of returned attributes
pdu_space_available = bearer.att_mtu - 2
attributes = []
response: att.ATT_PDU
async for attribute in (
attribute
for attribute in self.attributes
if attribute.handle >= request.starting_handle
and attribute.handle <= request.ending_handle
and attribute.type == request.attribute_type
and (await attribute.read_value(bearer)) == request.attribute_value
and pdu_space_available >= 4
):
# TODO: check permissions
# Add the attribute to the list
attributes.append(attribute)
pdu_space_available -= 4
# Return the list of attributes
if attributes:
handles_information_list = []
for attribute in attributes:
if attribute.type in (
GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE,
GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE,
GATT_CHARACTERISTIC_ATTRIBUTE_TYPE,
):
# Part of a group
group_end_handle = attribute.end_group_handle
else:
# Not part of a group
group_end_handle = attribute.handle
handles_information_list.append(
struct.pack('<HH', attribute.handle, group_end_handle)
)
response = att.ATT_Find_By_Type_Value_Response(
handles_information_list=b''.join(handles_information_list)
)
else:
response = att.ATT_Error_Response(
request_opcode_in_error=request.op_code,
attribute_handle_in_error=request.starting_handle,
error_code=att.ATT_ATTRIBUTE_NOT_FOUND_ERROR,
)
self.send_response(bearer, response)
@utils.AsyncRunner.run_in_task()
async def on_att_read_by_type_request(
self, bearer: att.Bearer, request: att.ATT_Read_By_Type_Request
):
'''
See Bluetooth spec Vol 3, Part F - 3.4.4.1 Read By Type Request
'''
pdu_space_available = bearer.att_mtu - 2
response: att.ATT_PDU = att.ATT_Error_Response(
request_opcode_in_error=request.op_code,
attribute_handle_in_error=request.starting_handle,
error_code=att.ATT_ATTRIBUTE_NOT_FOUND_ERROR,
)
if (
request.starting_handle == 0x0000
or request.starting_handle > request.ending_handle
):
response = att.ATT_Error_Response(
request_opcode_in_error=request.op_code,
attribute_handle_in_error=request.starting_handle,
error_code=att.ATT_INVALID_HANDLE_ERROR,
)
self.send_response(bearer, response)
return
attributes: list[tuple[int, bytes]] = []
for attribute in (
attribute
for attribute in self.attributes
if attribute.type == request.attribute_type
and attribute.handle >= request.starting_handle
and attribute.handle <= request.ending_handle
and pdu_space_available
):
try:
attribute_value = await attribute.read_value(bearer)
except att.ATT_Error as error:
# If the first attribute is unreadable, return an error
# Otherwise return attributes up to this point
if not attributes:
response = att.ATT_Error_Response(
request_opcode_in_error=request.op_code,
attribute_handle_in_error=attribute.handle,
error_code=error.error_code,
)
break
# Check the attribute value size
max_attribute_size = min(bearer.att_mtu - 4, 253)
if len(attribute_value) > max_attribute_size:
# We need to truncate
attribute_value = attribute_value[:max_attribute_size]
if attributes and len(attributes[0][1]) != len(attribute_value):
# Not the same size as previous attribute, stop here
break
# Check if there is enough space
entry_size = 2 + len(attribute_value)
if pdu_space_available < entry_size:
break
# Add the attribute to the list
attributes.append((attribute.handle, attribute_value))
pdu_space_available -= entry_size
if attributes:
attribute_data_list = [
struct.pack('<H', handle) + value for handle, value in attributes
]
response = att.ATT_Read_By_Type_Response(
length=entry_size, attribute_data_list=b''.join(attribute_data_list)
)
else:
logging.debug(f"not found {request}")
self.send_response(bearer, response)
@utils.AsyncRunner.run_in_task()
async def on_att_read_request(
self, bearer: att.Bearer, request: att.ATT_Read_Request
):
'''
See Bluetooth spec Vol 3, Part F - 3.4.4.3 Read Request
'''
response: att.ATT_PDU
if attribute := self.get_attribute(request.attribute_handle):
try:
value = await attribute.read_value(bearer)
except att.ATT_Error as error:
response = att.ATT_Error_Response(
request_opcode_in_error=request.op_code,
attribute_handle_in_error=request.attribute_handle,
error_code=error.error_code,
)
else:
value_size = min(bearer.att_mtu - 1, len(value))
response = att.ATT_Read_Response(attribute_value=value[:value_size])
else:
response = att.ATT_Error_Response(
request_opcode_in_error=request.op_code,
attribute_handle_in_error=request.attribute_handle,
error_code=att.ATT_INVALID_HANDLE_ERROR,
)
self.send_response(bearer, response)
@utils.AsyncRunner.run_in_task()
async def on_att_read_blob_request(
self, bearer: att.Bearer, request: att.ATT_Read_Blob_Request
):
'''
See Bluetooth spec Vol 3, Part F - 3.4.4.5 Read Blob Request
'''
response: att.ATT_PDU
if attribute := self.get_attribute(request.attribute_handle):
try:
value = await attribute.read_value(bearer)
except att.ATT_Error as error:
response = att.ATT_Error_Response(
request_opcode_in_error=request.op_code,
attribute_handle_in_error=request.attribute_handle,
error_code=error.error_code,
)
else:
if request.value_offset > len(value):
response = att.ATT_Error_Response(
request_opcode_in_error=request.op_code,
attribute_handle_in_error=request.attribute_handle,
error_code=att.ATT_INVALID_OFFSET_ERROR,
)
elif len(value) <= bearer.att_mtu - 1:
response = att.ATT_Error_Response(
request_opcode_in_error=request.op_code,
attribute_handle_in_error=request.attribute_handle,
error_code=att.ATT_ATTRIBUTE_NOT_LONG_ERROR,
)
else:
part_size = min(
bearer.att_mtu - 1, len(value) - request.value_offset
)
response = att.ATT_Read_Blob_Response(
part_attribute_value=value[
request.value_offset : request.value_offset + part_size
]
)
else:
response = att.ATT_Error_Response(
request_opcode_in_error=request.op_code,
attribute_handle_in_error=request.attribute_handle,
error_code=att.ATT_INVALID_HANDLE_ERROR,
)
self.send_response(bearer, response)
@utils.AsyncRunner.run_in_task()
async def on_att_read_by_group_type_request(
self, bearer: att.Bearer, request: att.ATT_Read_By_Group_Type_Request
):
'''
See Bluetooth spec Vol 3, Part F - 3.4.4.9 Read by Group Type Request
'''
response: att.ATT_PDU
if request.attribute_group_type not in (
GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE,
GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE,
):
response = att.ATT_Error_Response(
request_opcode_in_error=request.op_code,
attribute_handle_in_error=request.starting_handle,
error_code=att.ATT_UNSUPPORTED_GROUP_TYPE_ERROR,
)
self.send_response(bearer, response)
return
pdu_space_available = bearer.att_mtu - 2
attributes: list[tuple[int, int, bytes]] = []
for attribute in (
attribute
for attribute in self.attributes
if attribute.type == request.attribute_group_type
and attribute.handle >= request.starting_handle
and attribute.handle <= request.ending_handle
and pdu_space_available
):
# No need to catch permission errors here, since these attributes
# must all be world-readable
attribute_value = await attribute.read_value(bearer)
# Check the attribute value size
max_attribute_size = min(bearer.att_mtu - 6, 251)
if len(attribute_value) > max_attribute_size:
# We need to truncate
attribute_value = attribute_value[:max_attribute_size]
if attributes and len(attributes[0][2]) != len(attribute_value):
# Not the same size as previous attributes, stop here
break
# Check if there is enough space
entry_size = 4 + len(attribute_value)
if pdu_space_available < entry_size:
break
# Add the attribute to the list
attributes.append(
(attribute.handle, attribute.end_group_handle, attribute_value)
)
pdu_space_available -= entry_size
if attributes:
attribute_data_list = [
struct.pack('<HH', handle, end_group_handle) + value
for handle, end_group_handle, value in attributes
]
response = att.ATT_Read_By_Group_Type_Response(
length=len(attribute_data_list[0]),
attribute_data_list=b''.join(attribute_data_list),
)
else:
response = att.ATT_Error_Response(
request_opcode_in_error=request.op_code,
attribute_handle_in_error=request.starting_handle,
error_code=att.ATT_ATTRIBUTE_NOT_FOUND_ERROR,
)
self.send_response(bearer, response)
@utils.AsyncRunner.run_in_task()
async def on_att_read_multiple_request(
self, bearer: att.Bearer, request: att.ATT_Read_Multiple_Request
):
'''
See Bluetooth spec Vol 3, Part F - 3.4.4.7 Read Multiple Request.
'''
response: att.ATT_PDU
pdu_space_available = bearer.att_mtu - 1
values: list[bytes] = []
for handle in request.set_of_handles:
if not (attribute := self.get_attribute(handle)):
response = att.ATT_Error_Response(
request_opcode_in_error=request.op_code,
attribute_handle_in_error=handle,
error_code=att.ATT_ATTRIBUTE_NOT_FOUND_ERROR,
)
self.send_response(bearer, response)
return
# No need to catch permission errors here, since these attributes
# must all be world-readable
attribute_value = await attribute.read_value(bearer)
# Check the attribute value size
max_attribute_size = min(bearer.att_mtu - 1, 251)
if len(attribute_value) > max_attribute_size:
# We need to truncate
attribute_value = attribute_value[:max_attribute_size]
# Check if there is enough space
entry_size = len(attribute_value)
if pdu_space_available < entry_size:
break
# Add the attribute to the list
values.append(attribute_value)
pdu_space_available -= entry_size
response = att.ATT_Read_Multiple_Response(set_of_values=b''.join(values))
self.send_response(bearer, response)
@utils.AsyncRunner.run_in_task()
async def on_att_read_multiple_variable_request(
self, bearer: att.Bearer, request: att.ATT_Read_Multiple_Variable_Request
):
'''
See Bluetooth spec Vol 3, Part F - 3.4.4.11 Read Multiple Variable Request.
'''
response: att.ATT_PDU
pdu_space_available = bearer.att_mtu - 1
length_value_tuple_list: list[tuple[int, bytes]] = []
for handle in request.set_of_handles:
if not (attribute := self.get_attribute(handle)):
response = att.ATT_Error_Response(
request_opcode_in_error=request.op_code,
attribute_handle_in_error=handle,
error_code=att.ATT_ATTRIBUTE_NOT_FOUND_ERROR,
)
self.send_response(bearer, response)
return
# No need to catch permission errors here, since these attributes
# must all be world-readable
attribute_value = await attribute.read_value(bearer)
length = len(attribute_value)
# Check the attribute value size
max_attribute_size = min(bearer.att_mtu - 3, 251)
if len(attribute_value) > max_attribute_size:
# We need to truncate
attribute_value = attribute_value[:max_attribute_size]
# Check if there is enough space
entry_size = 2 + len(attribute_value)
# Add the attribute to the list
length_value_tuple_list.append((length, attribute_value))
pdu_space_available -= entry_size
if pdu_space_available <= 0:
break
response = att.ATT_Read_Multiple_Variable_Response(
length_value_tuple_list=length_value_tuple_list
)
self.send_response(bearer, response)
@utils.AsyncRunner.run_in_task()
async def on_att_write_request(
self, bearer: att.Bearer, request: att.ATT_Write_Request
):
'''
See Bluetooth spec Vol 3, Part F - 3.4.5.1 Write Request
'''
# Check that the attribute exists
attribute = self.get_attribute(request.attribute_handle)
if attribute is None:
self.send_response(
bearer,
att.ATT_Error_Response(
request_opcode_in_error=request.op_code,
attribute_handle_in_error=request.attribute_handle,
error_code=att.ATT_INVALID_HANDLE_ERROR,
),
)
return
# TODO: check permissions
# Check the request parameters
if len(request.attribute_value) > GATT_MAX_ATTRIBUTE_VALUE_SIZE:
self.send_response(
bearer,
att.ATT_Error_Response(
request_opcode_in_error=request.op_code,
attribute_handle_in_error=request.attribute_handle,
error_code=att.ATT_INVALID_ATTRIBUTE_LENGTH_ERROR,
),
)
return
response: att.ATT_PDU
try:
# Accept the value
await attribute.write_value(bearer, request.attribute_value)
except att.ATT_Error as error:
response = att.ATT_Error_Response(
request_opcode_in_error=request.op_code,
attribute_handle_in_error=request.attribute_handle,
error_code=error.error_code,
)
else:
# Done
response = att.ATT_Write_Response()
self.send_response(bearer, response)
@utils.AsyncRunner.run_in_task()
async def on_att_write_command(
self, bearer: att.Bearer, request: att.ATT_Write_Command
):
'''
See Bluetooth spec Vol 3, Part F - 3.4.5.3 Write Command
'''
# Check that the attribute exists
attribute = self.get_attribute(request.attribute_handle)
if attribute is None:
return
# TODO: check permissions
# Check the request parameters
if len(request.attribute_value) > GATT_MAX_ATTRIBUTE_VALUE_SIZE:
return
# Accept the value
try:
await attribute.write_value(bearer, request.attribute_value)
except Exception:
logger.exception('!!! ignoring exception')
def on_att_handle_value_confirmation(
self,
bearer: att.Bearer,
confirmation: att.ATT_Handle_Value_Confirmation,
):
'''
See Bluetooth spec Vol 3, Part F - 3.4.7.3 Handle Value Confirmation
'''
del confirmation # Unused.
if (pending_confirmation := self.pending_confirmations[bearer]) is None:
# Not expected!
logger.warning(
'!!! unexpected confirmation, there is no pending indication'
)
return
pending_confirmation.set_result(None)