Merge pull request #707 from zxzxwu/typing

Replace pre-3.9 typing aliases
This commit is contained in:
zxzxwu
2025-06-09 14:39:29 +08:00
committed by GitHub
68 changed files with 366 additions and 424 deletions

View File

@@ -29,9 +29,7 @@ from typing import (
Any,
AsyncGenerator,
Coroutine,
Deque,
Optional,
Tuple,
)
import click
@@ -131,7 +129,7 @@ class BroadcastScanner(bumble.utils.EventEmitter):
basic_audio_announcement: Optional[bap.BasicAudioAnnouncement] = None
appearance: Optional[core.Appearance] = None
biginfo: Optional[bumble.device.BIGInfoAdvertisement] = None
manufacturer_data: Optional[Tuple[str, bytes]] = None
manufacturer_data: Optional[tuple[str, bytes]] = None
def __post_init__(self) -> None:
super().__init__()
@@ -748,7 +746,9 @@ async def run_receive(
sample_rate_hz=sampling_frequency.hz,
num_channels=num_bis,
)
lc3_queues: list[Deque[bytes]] = [collections.deque() for i in range(num_bis)]
lc3_queues: list[collections.deque[bytes]] = [
collections.deque() for i in range(num_bis)
]
packet_stats = [0, 0]
audio_output = await audio_io.create_audio_output(output)
@@ -764,7 +764,7 @@ async def run_receive(
)
)
def sink(queue: Deque[bytes], packet: hci.HCI_IsoDataPacket):
def sink(queue: collections.deque[bytes], packet: hci.HCI_IsoDataPacket):
# TODO: re-assemble fragments and detect errors
queue.append(packet.iso_sdu_fragment)

View File

@@ -55,7 +55,7 @@ from prompt_toolkit.layout import (
from bumble import __version__
import bumble.core
from bumble import colors
from bumble.core import UUID, AdvertisingData, PhysicalTransport
from bumble.core import UUID, AdvertisingData
from bumble.device import (
ConnectionParametersPreferences,
ConnectionPHY,

View File

@@ -4,7 +4,7 @@ import logging
import json
from bumble.pandora import PandoraDevice, Config, serve
from typing import Dict, Any
from typing import Any
BUMBLE_SERVER_GRPC_PORT = 7999
ROOTCANAL_PORT_CUTTLEFISH = 7300
@@ -39,7 +39,7 @@ def main(grpc_port: int, rootcanal_port: int, transport: str, config: str) -> No
asyncio.run(serve(device, config=server_config, port=grpc_port))
def retrieve_config(config: str) -> Dict[str, Any]:
def retrieve_config(config: str) -> dict[str, Any]:
if not config:
return {}

View File

@@ -16,6 +16,7 @@
# Imports
# -----------------------------------------------------------------------------
import datetime
import importlib
import logging
import os
import struct
@@ -154,9 +155,10 @@ class Printer:
def main(format, vendor, filename):
for vendor_name in vendor:
if vendor_name == 'android':
import bumble.vendor.android.hci
# Prevent being deleted by linter.
importlib.import_module('bumble.vendor.android.hci')
elif vendor_name == 'zephyr':
import bumble.vendor.zephyr.hci
importlib.import_module('bumble.vendor.zephyr.hci')
input = open(filename, 'rb')
if format == 'h4':

View File

@@ -25,7 +25,7 @@ import os
import logging
import pathlib
import subprocess
from typing import Dict, List, Optional
from typing import Optional
import weakref
import click
@@ -448,7 +448,7 @@ class Speaker:
# Create an HTTP server for the UI
self.ui_server = UiServer(speaker=self, port=ui_port)
def sdp_records(self) -> Dict[int, List[ServiceAttribute]]:
def sdp_records(self) -> dict[int, list[ServiceAttribute]]:
service_record_handle = 0x00010001
return {
service_record_handle: make_audio_sink_service_sdp_records(

View File

@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List, Union
from typing import Union
from bumble import core
@@ -21,7 +21,7 @@ class AtParsingError(core.InvalidPacketError):
"""Error raised when parsing AT commands fails."""
def tokenize_parameters(buffer: bytes) -> List[bytes]:
def tokenize_parameters(buffer: bytes) -> list[bytes]:
"""Split input parameters into tokens.
Removes space characters outside of double quote blocks:
T-rec-V-25 - 5.2.1 Command line general format: "Space characters (IA5 2/0)
@@ -63,12 +63,12 @@ def tokenize_parameters(buffer: bytes) -> List[bytes]:
return [bytes(token) for token in tokens if len(token) > 0]
def parse_parameters(buffer: bytes) -> List[Union[bytes, list]]:
def parse_parameters(buffer: bytes) -> list[Union[bytes, list]]:
"""Parse the parameters using the comma and parenthesis separators.
Raises AtParsingError in case of invalid input string."""
tokens = tokenize_parameters(buffer)
accumulator: List[list] = [[]]
accumulator: list[list] = [[]]
current: Union[bytes, list] = bytes()
for token in tokens:

View File

@@ -32,10 +32,6 @@ from typing import (
Awaitable,
Callable,
Generic,
Dict,
List,
Optional,
Type,
TypeVar,
Union,
TYPE_CHECKING,
@@ -251,7 +247,7 @@ class ATT_PDU:
See Bluetooth spec @ Vol 3, Part F - 3.3 ATTRIBUTE PDU
'''
pdu_classes: Dict[int, Type[ATT_PDU]] = {}
pdu_classes: dict[int, type[ATT_PDU]] = {}
op_code = 0
name: str
@@ -818,7 +814,7 @@ class Attribute(utils.EventEmitter, Generic[_T]):
# The check for `p.name is not None` here is needed because for InFlag
# enums, the .name property can be None, when the enum value is 0,
# so the type hint for .name is Optional[str].
enum_list: List[str] = [p.name for p in cls if p.name is not None]
enum_list: list[str] = [p.name for p in cls if p.name is not None]
enum_list_str = ",".join(enum_list)
raise TypeError(
f"Attribute::permissions error:\nExpected a string containing any of the keys, separated by commas: {enum_list_str}\nGot: {permissions_str}"

View File

@@ -18,7 +18,7 @@
from __future__ import annotations
import enum
import struct
from typing import Dict, Type, Union, Tuple
from typing import Union
from bumble import core
from bumble import utils
@@ -213,11 +213,11 @@ class CommandFrame(Frame):
NOTIFY = 0x03
GENERAL_INQUIRY = 0x04
subclasses: Dict[Frame.OperationCode, Type[CommandFrame]] = {}
subclasses: dict[Frame.OperationCode, type[CommandFrame]] = {}
ctype: CommandType
@staticmethod
def parse_operands(operands: bytes) -> Tuple:
def parse_operands(operands: bytes) -> tuple:
raise NotImplementedError
def __init__(
@@ -251,11 +251,11 @@ class ResponseFrame(Frame):
CHANGED = 0x0D
INTERIM = 0x0F
subclasses: Dict[Frame.OperationCode, Type[ResponseFrame]] = {}
subclasses: dict[Frame.OperationCode, type[ResponseFrame]] = {}
response: ResponseCode
@staticmethod
def parse_operands(operands: bytes) -> Tuple:
def parse_operands(operands: bytes) -> tuple:
raise NotImplementedError
def __init__(
@@ -282,7 +282,7 @@ class VendorDependentFrame:
vendor_dependent_data: bytes
@staticmethod
def parse_operands(operands: bytes) -> Tuple:
def parse_operands(operands: bytes) -> tuple:
return (
struct.unpack(">I", b"\x00" + operands[:3])[0],
operands[3:],
@@ -432,7 +432,7 @@ class PassThroughFrame:
operation_data: bytes
@staticmethod
def parse_operands(operands: bytes) -> Tuple:
def parse_operands(operands: bytes) -> tuple:
return (
PassThroughFrame.StateFlag(operands[0] >> 7),
PassThroughFrame.OperationId(operands[0] & 0x7F),

View File

@@ -19,7 +19,7 @@ from __future__ import annotations
from enum import IntEnum
import logging
import struct
from typing import Callable, cast, Dict, Optional
from typing import Callable, cast, Optional
from bumble.colors import color
from bumble import avc
@@ -146,9 +146,9 @@ class MessageAssembler:
# -----------------------------------------------------------------------------
class Protocol:
CommandHandler = Callable[[int, avc.CommandFrame], None]
command_handlers: Dict[int, CommandHandler] # Command handlers, by PID
command_handlers: dict[int, CommandHandler] # Command handlers, by PID
ResponseHandler = Callable[[int, Optional[avc.ResponseFrame]], None]
response_handlers: Dict[int, ResponseHandler] # Response handlers, by PID
response_handlers: dict[int, ResponseHandler] # Response handlers, by PID
next_transaction_label: int
message_assembler: MessageAssembler

View File

@@ -24,12 +24,8 @@ import warnings
from typing import (
Any,
Awaitable,
Dict,
Type,
Tuple,
Optional,
Callable,
List,
AsyncGenerator,
Iterable,
Union,
@@ -227,7 +223,7 @@ AVDTP_STATE_NAMES = {
# -----------------------------------------------------------------------------
async def find_avdtp_service_with_sdp_client(
sdp_client: sdp.Client,
) -> Optional[Tuple[int, int]]:
) -> Optional[tuple[int, int]]:
'''
Find an AVDTP service, using a connected SDP client, and return its version,
or None if none is found
@@ -257,7 +253,7 @@ async def find_avdtp_service_with_sdp_client(
# -----------------------------------------------------------------------------
async def find_avdtp_service_with_connection(
connection: device.Connection,
) -> Optional[Tuple[int, int]]:
) -> Optional[tuple[int, int]]:
'''
Find an AVDTP service, for a connection, and return its version,
or None if none is found
@@ -451,7 +447,7 @@ class ServiceCapabilities:
service_category: int, service_capabilities_bytes: bytes
) -> ServiceCapabilities:
# Select the appropriate subclass
cls: Type[ServiceCapabilities]
cls: type[ServiceCapabilities]
if service_category == AVDTP_MEDIA_CODEC_SERVICE_CATEGORY:
cls = MediaCodecCapabilities
else:
@@ -466,7 +462,7 @@ class ServiceCapabilities:
return instance
@staticmethod
def parse_capabilities(payload: bytes) -> List[ServiceCapabilities]:
def parse_capabilities(payload: bytes) -> list[ServiceCapabilities]:
capabilities = []
while payload:
service_category = payload[0]
@@ -499,7 +495,7 @@ class ServiceCapabilities:
self.service_category = service_category
self.service_capabilities_bytes = service_capabilities_bytes
def to_string(self, details: Optional[List[str]] = None) -> str:
def to_string(self, details: Optional[list[str]] = None) -> str:
attributes = ','.join(
[name_or_number(AVDTP_SERVICE_CATEGORY_NAMES, self.service_category)]
+ (details or [])
@@ -612,7 +608,7 @@ class Message: # pylint:disable=attribute-defined-outside-init
RESPONSE_REJECT = 3
# Subclasses, by signal identifier and message type
subclasses: Dict[int, Dict[int, Type[Message]]] = {}
subclasses: dict[int, dict[int, type[Message]]] = {}
message_type: MessageType
signal_identifier: int
@@ -757,7 +753,7 @@ class Discover_Response(Message):
See Bluetooth AVDTP spec - 8.6.2 Stream End Point Discovery Response
'''
endpoints: List[EndPointInfo]
endpoints: list[EndPointInfo]
def init_from_payload(self):
self.endpoints = []
@@ -1202,10 +1198,10 @@ class DelayReport_Reject(Simple_Reject):
# -----------------------------------------------------------------------------
class Protocol(utils.EventEmitter):
local_endpoints: List[LocalStreamEndPoint]
remote_endpoints: Dict[int, DiscoveredStreamEndPoint]
streams: Dict[int, Stream]
transaction_results: List[Optional[asyncio.Future[Message]]]
local_endpoints: list[LocalStreamEndPoint]
remote_endpoints: dict[int, DiscoveredStreamEndPoint]
streams: dict[int, Stream]
transaction_results: list[Optional[asyncio.Future[Message]]]
channel_connector: Callable[[], Awaitable[l2cap.ClassicChannel]]
EVENT_OPEN = "open"
@@ -1223,7 +1219,7 @@ class Protocol(utils.EventEmitter):
@staticmethod
async def connect(
connection: device.Connection, version: Tuple[int, int] = (1, 3)
connection: device.Connection, version: tuple[int, int] = (1, 3)
) -> Protocol:
channel = await connection.create_l2cap_channel(
spec=l2cap.ClassicChannelSpec(psm=AVDTP_PSM)
@@ -1233,7 +1229,7 @@ class Protocol(utils.EventEmitter):
return protocol
def __init__(
self, l2cap_channel: l2cap.ClassicChannel, version: Tuple[int, int] = (1, 3)
self, l2cap_channel: l2cap.ClassicChannel, version: tuple[int, int] = (1, 3)
) -> None:
super().__init__()
self.l2cap_channel = l2cap_channel
@@ -1502,7 +1498,7 @@ class Protocol(utils.EventEmitter):
return response
async def start_transaction(self) -> Tuple[int, asyncio.Future[Message]]:
async def start_transaction(self) -> tuple[int, asyncio.Future[Message]]:
# Wait until we can start a new transaction
await self.transaction_semaphore.acquire()
@@ -1703,7 +1699,7 @@ class Protocol(utils.EventEmitter):
# -----------------------------------------------------------------------------
class Listener(utils.EventEmitter):
servers: Dict[int, Protocol]
servers: dict[int, Protocol]
EVENT_CONNECTION = "connection"
@@ -1735,7 +1731,7 @@ class Listener(utils.EventEmitter):
@classmethod
def for_device(
cls, device: device.Device, version: Tuple[int, int] = (1, 3)
cls, device: device.Device, version: tuple[int, int] = (1, 3)
) -> Listener:
listener = Listener(registrar=None, version=version)
l2cap_server = device.create_l2cap_server(

View File

@@ -26,14 +26,11 @@ from typing import (
Awaitable,
Callable,
cast,
Dict,
Iterable,
List,
Optional,
Sequence,
SupportsBytes,
Tuple,
Type,
TypeVar,
Union,
)
@@ -84,10 +81,10 @@ AVRCP_BLUETOOTH_SIG_COMPANY_ID = 0x001958
# -----------------------------------------------------------------------------
def make_controller_service_sdp_records(
service_record_handle: int,
avctp_version: Tuple[int, int] = (1, 4),
avrcp_version: Tuple[int, int] = (1, 6),
avctp_version: tuple[int, int] = (1, 4),
avrcp_version: tuple[int, int] = (1, 6),
supported_features: int = 1,
) -> List[ServiceAttribute]:
) -> list[ServiceAttribute]:
# TODO: support a way to compute the supported features from a feature list
avctp_version_int = avctp_version[0] << 8 | avctp_version[1]
avrcp_version_int = avrcp_version[0] << 8 | avrcp_version[1]
@@ -152,10 +149,10 @@ def make_controller_service_sdp_records(
# -----------------------------------------------------------------------------
def make_target_service_sdp_records(
service_record_handle: int,
avctp_version: Tuple[int, int] = (1, 4),
avrcp_version: Tuple[int, int] = (1, 6),
avctp_version: tuple[int, int] = (1, 4),
avrcp_version: tuple[int, int] = (1, 6),
supported_features: int = 0x23,
) -> List[ServiceAttribute]:
) -> list[ServiceAttribute]:
# TODO: support a way to compute the supported features from a feature list
avctp_version_int = avctp_version[0] << 8 | avctp_version[1]
avrcp_version_int = avrcp_version[0] << 8 | avrcp_version[1]
@@ -291,7 +288,7 @@ class Command:
pdu_id: Protocol.PduId
parameter: bytes
def to_string(self, properties: Dict[str, str]) -> str:
def to_string(self, properties: dict[str, str]) -> str:
properties_str = ",".join(
[f"{name}={value}" for name, value in properties.items()]
)
@@ -337,7 +334,7 @@ class GetPlayStatusCommand(Command):
# -----------------------------------------------------------------------------
class GetElementAttributesCommand(Command):
identifier: int
attribute_ids: List[MediaAttributeId]
attribute_ids: list[MediaAttributeId]
@classmethod
def from_bytes(cls, pdu: bytes) -> GetElementAttributesCommand:
@@ -409,7 +406,7 @@ class Response:
pdu_id: Protocol.PduId
parameter: bytes
def to_string(self, properties: Dict[str, str]) -> str:
def to_string(self, properties: dict[str, str]) -> str:
properties_str = ",".join(
[f"{name}={value}" for name, value in properties.items()]
)
@@ -454,7 +451,7 @@ class NotImplementedResponse(Response):
# -----------------------------------------------------------------------------
class GetCapabilitiesResponse(Response):
capability_id: GetCapabilitiesCommand.CapabilityId
capabilities: List[Union[SupportsBytes, bytes]]
capabilities: list[Union[SupportsBytes, bytes]]
@classmethod
def from_bytes(cls, pdu: bytes) -> GetCapabilitiesResponse:
@@ -467,7 +464,7 @@ class GetCapabilitiesResponse(Response):
capability_id = GetCapabilitiesCommand.CapabilityId(pdu[0])
capability_count = pdu[1]
capabilities: List[Union[SupportsBytes, bytes]]
capabilities: list[Union[SupportsBytes, bytes]]
if capability_id == GetCapabilitiesCommand.CapabilityId.EVENTS_SUPPORTED:
capabilities = [EventId(pdu[2 + x]) for x in range(capability_count)]
else:
@@ -540,13 +537,13 @@ class GetPlayStatusResponse(Response):
# -----------------------------------------------------------------------------
class GetElementAttributesResponse(Response):
attributes: List[MediaAttribute]
attributes: list[MediaAttribute]
@classmethod
def from_bytes(cls, pdu: bytes) -> GetElementAttributesResponse:
num_attributes = pdu[0]
offset = 1
attributes: List[MediaAttribute] = []
attributes: list[MediaAttribute] = []
for _ in range(num_attributes):
(
attribute_id_int,
@@ -817,7 +814,7 @@ class PlayerApplicationSettingChangedEvent(Event):
attribute_id: ApplicationSetting.AttributeId
value_id: utils.OpenIntEnum
player_application_settings: List[Setting]
player_application_settings: list[Setting]
@classmethod
def from_bytes(cls, pdu: bytes) -> PlayerApplicationSettingChangedEvent:
@@ -939,7 +936,7 @@ class VolumeChangedEvent(Event):
# -----------------------------------------------------------------------------
EVENT_SUBCLASSES: Dict[EventId, Type[Event]] = {
EVENT_SUBCLASSES: dict[EventId, type[Event]] = {
EventId.PLAYBACK_STATUS_CHANGED: PlaybackStatusChangedEvent,
EventId.PLAYBACK_POS_CHANGED: PlaybackPositionChangedEvent,
EventId.TRACK_CHANGED: TrackChangedEvent,
@@ -967,14 +964,14 @@ class Delegate:
def __init__(self, status_code: Protocol.StatusCode) -> None:
self.status_code = status_code
supported_events: List[EventId]
supported_events: list[EventId]
volume: int
def __init__(self, supported_events: Iterable[EventId] = ()) -> None:
self.supported_events = list(supported_events)
self.volume = 0
async def get_supported_events(self) -> List[EventId]:
async def get_supported_events(self) -> list[EventId]:
return self.supported_events
async def set_absolute_volume(self, volume: int) -> None:
@@ -1124,8 +1121,8 @@ class Protocol(utils.EventEmitter):
receive_response_state: Optional[ReceiveResponseState]
avctp_protocol: Optional[avctp.Protocol]
free_commands: asyncio.Queue
pending_commands: Dict[int, PendingCommand] # Pending commands, by label
notification_listeners: Dict[EventId, NotificationListener]
pending_commands: dict[int, PendingCommand] # Pending commands, by label
notification_listeners: dict[EventId, NotificationListener]
@staticmethod
def _check_vendor_dependent_frame(
@@ -1190,7 +1187,7 @@ class Protocol(utils.EventEmitter):
@staticmethod
def _check_response(
response_context: ResponseContext, expected_type: Type[_R]
response_context: ResponseContext, expected_type: type[_R]
) -> _R:
if isinstance(response_context, Protocol.FinalResponse):
if (
@@ -1230,7 +1227,7 @@ class Protocol(utils.EventEmitter):
utils.AsyncRunner.spawn(call())
async def get_supported_events(self) -> List[EventId]:
async def get_supported_events(self) -> list[EventId]:
"""Get the list of events supported by the connected peer."""
response_context = await self.send_avrcp_command(
avc.CommandFrame.CommandType.STATUS,
@@ -1253,7 +1250,7 @@ class Protocol(utils.EventEmitter):
async def get_element_attributes(
self, element_identifier: int, attribute_ids: Sequence[MediaAttributeId]
) -> List[MediaAttribute]:
) -> list[MediaAttribute]:
"""Get element attributes from the connected peer."""
response_context = await self.send_avrcp_command(
avc.CommandFrame.CommandType.STATUS,
@@ -1335,7 +1332,7 @@ class Protocol(utils.EventEmitter):
async def monitor_player_application_settings(
self,
) -> AsyncIterator[List[PlayerApplicationSettingChangedEvent.Setting]]:
) -> AsyncIterator[list[PlayerApplicationSettingChangedEvent.Setting]]:
"""Monitor Player Application Setting changes from the connected peer."""
async for event in self.monitor_events(
EventId.PLAYER_APPLICATION_SETTING_CHANGED, 0

View File

@@ -13,7 +13,7 @@
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
from functools import partial
from typing import List, Optional, Union
from typing import Optional, Union
class ColorError(ValueError):
@@ -65,7 +65,7 @@ def color(
bg: Optional[ColorSpec] = None,
style: Optional[str] = None,
) -> str:
codes: List[ColorSpec] = []
codes: list[ColorSpec] = []
if fg:
codes.append(_color_code(fg, 30))

View File

@@ -63,7 +63,7 @@ from bumble.hci import (
HCI_Packet,
HCI_Role_Change_Event,
)
from typing import Optional, Union, Dict, Any, TYPE_CHECKING
from typing import Optional, Union, Any, TYPE_CHECKING
if TYPE_CHECKING:
from bumble.link import LocalLink
@@ -132,17 +132,17 @@ class Controller:
self.hci_sink = None
self.link = link
self.central_connections: Dict[Address, Connection] = (
self.central_connections: dict[Address, Connection] = (
{}
) # Connections where this controller is the central
self.peripheral_connections: Dict[Address, Connection] = (
self.peripheral_connections: dict[Address, Connection] = (
{}
) # Connections where this controller is the peripheral
self.classic_connections: Dict[Address, Connection] = (
self.classic_connections: dict[Address, Connection] = (
{}
) # Connections in BR/EDR
self.central_cis_links: Dict[int, CisLink] = {} # CIS links by handle
self.peripheral_cis_links: Dict[int, CisLink] = {} # CIS links by handle
self.central_cis_links: dict[int, CisLink] = {} # CIS links by handle
self.peripheral_cis_links: dict[int, CisLink] = {} # CIS links by handle
self.hci_version = HCI_VERSION_BLUETOOTH_CORE_5_0
self.hci_revision = 0

View File

@@ -37,10 +37,7 @@ from typing import (
Any,
Callable,
ClassVar,
Deque,
Dict,
Optional,
Type,
TypeVar,
Union,
cast,
@@ -436,7 +433,7 @@ class AdvertisingEventProperties:
@classmethod
def from_advertising_type(
cls: Type[AdvertisingEventProperties],
cls: type[AdvertisingEventProperties],
advertising_type: AdvertisingType,
) -> AdvertisingEventProperties:
return cls(
@@ -1343,7 +1340,7 @@ class Peer:
return self.gatt_client.get_characteristics_by_uuid(uuid, service)
def create_service_proxy(
self, proxy_class: Type[_PROXY_CLASS]
self, proxy_class: type[_PROXY_CLASS]
) -> Optional[_PROXY_CLASS]:
if proxy := proxy_class.from_client(self.gatt_client):
return cast(_PROXY_CLASS, proxy)
@@ -1351,7 +1348,7 @@ class Peer:
return None
async def discover_service_and_create_proxy(
self, proxy_class: Type[_PROXY_CLASS]
self, proxy_class: type[_PROXY_CLASS]
) -> Optional[_PROXY_CLASS]:
# Discover the first matching service and its characteristics
services = await self.discover_service(proxy_class.SERVICE_CLASS.UUID)
@@ -1547,7 +1544,7 @@ class IsoPacketStream:
self.iso_link = iso_link
self.data_packet_queue = iso_link.data_packet_queue
self.data_packet_queue.on('flow', self._on_flow)
self._thresholds: Deque[int] = collections.deque()
self._thresholds: collections.deque[int] = collections.deque()
self._semaphore = asyncio.Semaphore(max_queue_size)
def _on_flow(self) -> None:
@@ -1956,9 +1953,9 @@ class DeviceConfiguration:
gatt_service_enabled: bool = True
def __post_init__(self) -> None:
self.gatt_services: list[Dict[str, Any]] = []
self.gatt_services: list[dict[str, Any]] = []
def load_from_dict(self, config: Dict[str, Any]) -> None:
def load_from_dict(self, config: dict[str, Any]) -> None:
config = copy.deepcopy(config)
# Load simple properties
@@ -2018,13 +2015,13 @@ class DeviceConfiguration:
self.load_from_dict(json.load(file))
@classmethod
def from_file(cls: Type[Self], filename: str) -> Self:
def from_file(cls: type[Self], filename: str) -> Self:
config = cls()
config.load_from_file(filename)
return config
@classmethod
def from_dict(cls: Type[Self], config: Dict[str, Any]) -> Self:
def from_dict(cls: type[Self], config: dict[str, Any]) -> Self:
device_config = cls()
device_config.load_from_dict(config)
return device_config
@@ -2121,22 +2118,22 @@ class Device(utils.CompositeEventEmitter):
advertising_data: bytes
scan_response_data: bytes
cs_capabilities: ChannelSoundingCapabilities | None = None
connections: Dict[int, Connection]
pending_connections: Dict[hci.Address, Connection]
classic_pending_accepts: Dict[
connections: dict[int, Connection]
pending_connections: dict[hci.Address, Connection]
classic_pending_accepts: dict[
hci.Address,
list[asyncio.Future[Union[Connection, tuple[hci.Address, int, int]]]],
]
advertisement_accumulators: Dict[hci.Address, AdvertisementDataAccumulator]
advertisement_accumulators: dict[hci.Address, AdvertisementDataAccumulator]
periodic_advertising_syncs: list[PeriodicAdvertisingSync]
config: DeviceConfiguration
legacy_advertiser: Optional[LegacyAdvertiser]
sco_links: Dict[int, ScoLink]
cis_links: Dict[int, CisLink]
sco_links: dict[int, ScoLink]
cis_links: dict[int, CisLink]
bigs: dict[int, Big]
bis_links: dict[int, BisLink]
big_syncs: dict[int, BigSync]
_pending_cis: Dict[int, tuple[int, int]]
_pending_cis: dict[int, tuple[int, int]]
gatt_service: gatt_service.GenericAttributeProfileService | None = None
EVENT_ADVERTISEMENT = "advertisement"
@@ -2296,8 +2293,8 @@ class Device(utils.CompositeEventEmitter):
self.address_generation_offload = config.address_generation_offload
# Extended advertising.
self.extended_advertising_sets: Dict[int, AdvertisingSet] = {}
self.connecting_extended_advertising_sets: Dict[int, AdvertisingSet] = {}
self.extended_advertising_sets: dict[int, AdvertisingSet] = {}
self.connecting_extended_advertising_sets: dict[int, AdvertisingSet] = {}
# Legacy advertising.
# The advertising and scan response data, as well as the advertising interval
@@ -4273,11 +4270,11 @@ class Device(utils.CompositeEventEmitter):
self.smp_manager.pairing_config_factory = pairing_config_factory
@property
def smp_session_proxy(self) -> Type[smp.Session]:
def smp_session_proxy(self) -> type[smp.Session]:
return self.smp_manager.session_proxy
@smp_session_proxy.setter
def smp_session_proxy(self, session_proxy: Type[smp.Session]) -> None:
def smp_session_proxy(self, session_proxy: type[smp.Session]) -> None:
self.smp_manager.session_proxy = session_proxy
async def pair(self, connection):

View File

@@ -23,7 +23,7 @@ from __future__ import annotations
import logging
import pathlib
import platform
from typing import Dict, Iterable, Optional, Type, TYPE_CHECKING
from typing import Iterable, Optional, TYPE_CHECKING
from bumble.drivers import rtk, intel
from bumble.drivers.common import Driver
@@ -45,7 +45,7 @@ async def get_driver_for_host(host: Host) -> Optional[Driver]:
found.
If a "driver" HCI metadata entry is present, only that driver class will be probed.
"""
driver_classes: Dict[str, Type[Driver]] = {"rtk": rtk.Driver, "intel": intel.Driver}
driver_classes: dict[str, type[Driver]] = {"rtk": rtk.Driver, "intel": intel.Driver}
probe_list: Iterable[str]
if driver_name := host.hci_metadata.get("driver"):
# Only probe a single driver

View File

@@ -20,8 +20,6 @@ Common types for drivers.
# -----------------------------------------------------------------------------
import abc
from bumble import core
# -----------------------------------------------------------------------------
# Classes

View File

@@ -28,7 +28,7 @@ import os
import pathlib
import platform
import struct
from typing import Any, Deque, Optional, TYPE_CHECKING
from typing import Any, Optional, TYPE_CHECKING
from bumble import core
from bumble.drivers import common
@@ -348,7 +348,7 @@ class Driver(common.Driver):
def __init__(self, host: Host) -> None:
self.host = host
self.max_in_flight_firmware_load_commands = 1
self.pending_firmware_load_commands: Deque[hci.HCI_Command] = (
self.pending_firmware_load_commands: collections.deque[hci.HCI_Command] = (
collections.deque()
)
self.can_send_firmware_load_command = asyncio.Event()

View File

@@ -29,7 +29,6 @@ import os
import pathlib
import platform
import struct
from typing import Tuple
import weakref
@@ -294,7 +293,7 @@ class Driver(common.Driver):
@dataclass
class DriverInfo:
rom: int
hci: Tuple[int, int]
hci: tuple[int, int]
config_needed: bool
has_rom_version: bool
has_msft_ext: bool = False

View File

@@ -27,7 +27,7 @@ import enum
import functools
import logging
import struct
from typing import Iterable, List, Optional, Sequence, TypeVar, Union
from typing import Iterable, Optional, Sequence, TypeVar, Union
from bumble.colors import color
from bumble.core import BaseBumbleError, UUID
@@ -350,8 +350,8 @@ class Service(Attribute):
'''
uuid: UUID
characteristics: List[Characteristic]
included_services: List[Service]
characteristics: list[Characteristic]
included_services: list[Service]
def __init__(
self,
@@ -474,7 +474,7 @@ class Characteristic(Attribute[_T]):
# The check for `p.name is not None` here is needed because for InFlag
# enums, the .name property can be None, when the enum value is 0,
# so the type hint for .name is Optional[str].
enum_list: List[str] = [p.name for p in cls if p.name is not None]
enum_list: list[str] = [p.name for p in cls if p.name is not None]
enum_list_str = ",".join(enum_list)
raise TypeError(
f"Characteristic.Properties::from_string() error:\nExpected a string containing any of the keys, separated by , or |: {enum_list_str}\nGot: {properties_str}"

View File

@@ -28,7 +28,6 @@ from typing import (
Iterable,
Literal,
Optional,
Type,
TypeVar,
)
@@ -270,7 +269,7 @@ class SerializableCharacteristicAdapter(CharacteristicAdapter[_T2]):
`to_bytes` and `__bytes__` methods, respectively.
'''
def __init__(self, characteristic: Characteristic, cls: Type[_T2]) -> None:
def __init__(self, characteristic: Characteristic, cls: type[_T2]) -> None:
super().__init__(characteristic)
self.cls = cls
@@ -289,7 +288,7 @@ class SerializableCharacteristicProxyAdapter(CharacteristicProxyAdapter[_T2]):
'''
def __init__(
self, characteristic_proxy: CharacteristicProxy, cls: Type[_T2]
self, characteristic_proxy: CharacteristicProxy, cls: type[_T2]
) -> None:
super().__init__(characteristic_proxy)
self.cls = cls
@@ -311,7 +310,7 @@ class EnumCharacteristicAdapter(CharacteristicAdapter[_T3]):
def __init__(
self,
characteristic: Characteristic,
cls: Type[_T3],
cls: type[_T3],
length: int,
byteorder: Literal['little', 'big'] = 'little',
):
@@ -347,7 +346,7 @@ class EnumCharacteristicProxyAdapter(CharacteristicProxyAdapter[_T3]):
def __init__(
self,
characteristic_proxy: CharacteristicProxy,
cls: Type[_T3],
cls: type[_T3],
length: int,
byteorder: Literal['little', 'big'] = 'little',
):

View File

@@ -31,15 +31,10 @@ from datetime import datetime
from typing import (
Any,
Callable,
Dict,
Generic,
Iterable,
List,
Optional,
Set,
Tuple,
Union,
Type,
TypeVar,
TYPE_CHECKING,
)
@@ -149,8 +144,8 @@ class AttributeProxy(utils.EventEmitter, Generic[_T]):
class ServiceProxy(AttributeProxy):
uuid: UUID
characteristics: List[CharacteristicProxy[bytes]]
included_services: List[ServiceProxy]
characteristics: list[CharacteristicProxy[bytes]]
included_services: list[ServiceProxy]
@staticmethod
def from_client(service_class, client: Client, service_uuid: UUID):
@@ -199,8 +194,8 @@ class ServiceProxy(AttributeProxy):
class CharacteristicProxy(AttributeProxy[_T]):
properties: Characteristic.Properties
descriptors: List[DescriptorProxy]
subscribers: Dict[Any, Callable[[_T], Any]]
descriptors: list[DescriptorProxy]
subscribers: dict[Any, Callable[[_T], Any]]
EVENT_UPDATE = "update"
@@ -277,7 +272,7 @@ class ProfileServiceProxy:
Base class for profile-specific service proxies
'''
SERVICE_CLASS: Type[TemplateService]
SERVICE_CLASS: type[TemplateService]
@classmethod
def from_client(cls, client: Client) -> Optional[ProfileServiceProxy]:
@@ -288,13 +283,13 @@ class ProfileServiceProxy:
# GATT Client
# -----------------------------------------------------------------------------
class Client:
services: List[ServiceProxy]
cached_values: Dict[int, Tuple[datetime, bytes]]
notification_subscribers: Dict[
int, Set[Union[CharacteristicProxy, Callable[[bytes], Any]]]
services: list[ServiceProxy]
cached_values: dict[int, tuple[datetime, bytes]]
notification_subscribers: dict[
int, set[Union[CharacteristicProxy, Callable[[bytes], Any]]]
]
indication_subscribers: Dict[
int, Set[Union[CharacteristicProxy, Callable[[bytes], Any]]]
indication_subscribers: dict[
int, set[Union[CharacteristicProxy, Callable[[bytes], Any]]]
]
pending_response: Optional[asyncio.futures.Future[ATT_PDU]]
pending_request: Optional[ATT_PDU]
@@ -379,12 +374,12 @@ class Client:
return self.connection.att_mtu
def get_services_by_uuid(self, uuid: UUID) -> List[ServiceProxy]:
def get_services_by_uuid(self, uuid: UUID) -> list[ServiceProxy]:
return [service for service in self.services if service.uuid == uuid]
def get_characteristics_by_uuid(
self, uuid: UUID, service: Optional[ServiceProxy] = None
) -> List[CharacteristicProxy[bytes]]:
) -> list[CharacteristicProxy[bytes]]:
services = [service] if service else self.services
return [
c
@@ -395,8 +390,8 @@ class Client:
def get_attribute_grouping(self, attribute_handle: int) -> Optional[
Union[
ServiceProxy,
Tuple[ServiceProxy, CharacteristicProxy],
Tuple[ServiceProxy, CharacteristicProxy, DescriptorProxy],
tuple[ServiceProxy, CharacteristicProxy],
tuple[ServiceProxy, CharacteristicProxy, DescriptorProxy],
]
]:
"""
@@ -429,7 +424,7 @@ class Client:
if not already_known:
self.services.append(service)
async def discover_services(self, uuids: Iterable[UUID] = ()) -> List[ServiceProxy]:
async def discover_services(self, uuids: Iterable[UUID] = ()) -> list[ServiceProxy]:
'''
See Vol 3, Part G - 4.4.1 Discover All Primary Services
'''
@@ -501,7 +496,7 @@ class Client:
return services
async def discover_service(self, uuid: Union[str, UUID]) -> List[ServiceProxy]:
async def discover_service(self, uuid: Union[str, UUID]) -> list[ServiceProxy]:
'''
See Vol 3, Part G - 4.4.2 Discover Primary Service by Service UUID
'''
@@ -572,7 +567,7 @@ class Client:
async def discover_included_services(
self, service: ServiceProxy
) -> List[ServiceProxy]:
) -> list[ServiceProxy]:
'''
See Vol 3, Part G - 4.5.1 Find Included Services
'''
@@ -580,7 +575,7 @@ class Client:
starting_handle = service.handle
ending_handle = service.end_group_handle
included_services: List[ServiceProxy] = []
included_services: list[ServiceProxy] = []
while starting_handle <= ending_handle:
response = await self.send_request(
ATT_Read_By_Type_Request(
@@ -636,7 +631,7 @@ class Client:
async def discover_characteristics(
self, uuids, service: Optional[ServiceProxy]
) -> List[CharacteristicProxy[bytes]]:
) -> list[CharacteristicProxy[bytes]]:
'''
See Vol 3, Part G - 4.6.1 Discover All Characteristics of a Service and 4.6.2
Discover Characteristics by UUID
@@ -649,12 +644,12 @@ class Client:
services = [service] if service else self.services
# Perform characteristic discovery for each service
discovered_characteristics: List[CharacteristicProxy[bytes]] = []
discovered_characteristics: list[CharacteristicProxy[bytes]] = []
for service in services:
starting_handle = service.handle
ending_handle = service.end_group_handle
characteristics: List[CharacteristicProxy[bytes]] = []
characteristics: list[CharacteristicProxy[bytes]] = []
while starting_handle <= ending_handle:
response = await self.send_request(
ATT_Read_By_Type_Request(
@@ -725,7 +720,7 @@ class Client:
characteristic: Optional[CharacteristicProxy] = None,
start_handle: Optional[int] = None,
end_handle: Optional[int] = None,
) -> List[DescriptorProxy]:
) -> list[DescriptorProxy]:
'''
See Vol 3, Part G - 4.7.1 Discover All Characteristic Descriptors
'''
@@ -738,7 +733,7 @@ class Client:
else:
return []
descriptors: List[DescriptorProxy] = []
descriptors: list[DescriptorProxy] = []
while starting_handle <= ending_handle:
response = await self.send_request(
ATT_Find_Information_Request(
@@ -787,7 +782,7 @@ class Client:
return descriptors
async def discover_attributes(self) -> List[AttributeProxy[bytes]]:
async def discover_attributes(self) -> list[AttributeProxy[bytes]]:
'''
Discover all attributes, regardless of type
'''
@@ -1002,7 +997,7 @@ class Client:
async def read_characteristics_by_uuid(
self, uuid: UUID, service: Optional[ServiceProxy]
) -> List[bytes]:
) -> list[bytes]:
'''
See Vol 3, Part G - 4.8.2 Read Using Characteristic UUID
'''

View File

@@ -29,13 +29,9 @@ import logging
from collections import defaultdict
import struct
from typing import (
Dict,
Iterable,
List,
Optional,
Tuple,
TypeVar,
Type,
TYPE_CHECKING,
)
@@ -103,10 +99,10 @@ GATT_SERVER_DEFAULT_MAX_MTU = 517
# GATT Server
# -----------------------------------------------------------------------------
class Server(utils.EventEmitter):
attributes: List[Attribute]
services: List[Service]
attributes_by_handle: Dict[int, Attribute]
subscribers: Dict[int, Dict[int, bytes]]
attributes: list[Attribute]
services: list[Service]
attributes_by_handle: dict[int, Attribute]
subscribers: dict[int, dict[int, bytes]]
indication_semaphores: defaultdict[int, asyncio.Semaphore]
pending_confirmations: defaultdict[int, Optional[asyncio.futures.Future]]
@@ -136,7 +132,7 @@ class Server(utils.EventEmitter):
def next_handle(self) -> int:
return 1 + len(self.attributes)
def get_advertising_service_data(self) -> Dict[Attribute, bytes]:
def get_advertising_service_data(self) -> dict[Attribute, bytes]:
return {
attribute: data
for attribute in self.attributes
@@ -160,7 +156,7 @@ class Server(utils.EventEmitter):
AttributeGroupType = TypeVar('AttributeGroupType', Service, Characteristic)
def get_attribute_group(
self, handle: int, group_type: Type[AttributeGroupType]
self, handle: int, group_type: type[AttributeGroupType]
) -> Optional[AttributeGroupType]:
return next(
(
@@ -186,7 +182,7 @@ class Server(utils.EventEmitter):
def get_characteristic_attributes(
self, service_uuid: UUID, characteristic_uuid: UUID
) -> Optional[Tuple[CharacteristicDeclaration, Characteristic]]:
) -> Optional[tuple[CharacteristicDeclaration, Characteristic]]:
service_handle = self.get_service_attribute(service_uuid)
if not service_handle:
return None

View File

@@ -23,7 +23,7 @@ import functools
import logging
import secrets
import struct
from typing import Any, Callable, Dict, Iterable, List, Optional, Type, Union, ClassVar
from typing import Any, Callable, Iterable, Optional, Union, ClassVar
from typing_extensions import Self
from bumble import crypto
@@ -2269,8 +2269,8 @@ class HCI_Command(HCI_Packet):
'''
hci_packet_type = HCI_COMMAND_PACKET
command_names: Dict[int, str] = {}
command_classes: Dict[int, Type[HCI_Command]] = {}
command_names: dict[int, str] = {}
command_classes: dict[int, type[HCI_Command]] = {}
op_code: int
@staticmethod
@@ -2303,7 +2303,7 @@ class HCI_Command(HCI_Packet):
return inner
@staticmethod
def command_map(symbols: Dict[str, Any]) -> Dict[int, str]:
def command_map(symbols: dict[str, Any]) -> dict[int, str]:
return {
command_code: command_name
for (command_name, command_code) in symbols.items()
@@ -2311,7 +2311,7 @@ class HCI_Command(HCI_Packet):
}
@classmethod
def register_commands(cls, symbols: Dict[str, Any]) -> None:
def register_commands(cls, symbols: dict[str, Any]) -> None:
cls.command_names.update(cls.command_map(symbols))
@staticmethod
@@ -5055,13 +5055,13 @@ class HCI_LE_Set_CIG_Parameters_Command(HCI_Command):
framing: int
max_transport_latency_c_to_p: int
max_transport_latency_p_to_c: int
cis_id: List[int]
max_sdu_c_to_p: List[int]
max_sdu_p_to_c: List[int]
phy_c_to_p: List[int]
phy_p_to_c: List[int]
rtn_c_to_p: List[int]
rtn_p_to_c: List[int]
cis_id: list[int]
max_sdu_c_to_p: list[int]
max_sdu_p_to_c: list[int]
phy_c_to_p: list[int]
phy_p_to_c: list[int]
rtn_c_to_p: list[int]
rtn_p_to_c: list[int]
# -----------------------------------------------------------------------------
@@ -5078,8 +5078,8 @@ class HCI_LE_Create_CIS_Command(HCI_Command):
See Bluetooth spec @ 7.8.99 LE Create CIS command
'''
cis_connection_handle: List[int]
acl_connection_handle: List[int]
cis_connection_handle: list[int]
acl_connection_handle: list[int]
# -----------------------------------------------------------------------------
@@ -5198,7 +5198,7 @@ class HCI_LE_BIG_Create_Sync_Command(HCI_Command):
broadcast_code: int
mse: int
big_sync_timeout: int
bis: List[int]
bis: list[int]
# -----------------------------------------------------------------------------
@@ -5557,8 +5557,8 @@ class HCI_Event(HCI_Packet):
'''
hci_packet_type = HCI_EVENT_PACKET
event_names: Dict[int, str] = {}
event_classes: Dict[int, Type[HCI_Event]] = {}
event_names: dict[int, str] = {}
event_classes: dict[int, type[HCI_Event]] = {}
vendor_factories: list[Callable[[bytes], Optional[HCI_Event]]] = []
@staticmethod
@@ -5588,7 +5588,7 @@ class HCI_Event(HCI_Packet):
return inner
@staticmethod
def event_map(symbols: Dict[str, Any]) -> Dict[int, str]:
def event_map(symbols: dict[str, Any]) -> dict[int, str]:
return {
event_code: event_name
for (event_name, event_code) in symbols.items()
@@ -5602,7 +5602,7 @@ class HCI_Event(HCI_Packet):
return name_or_number(HCI_Event.event_names, event_code)
@staticmethod
def register_events(symbols: Dict[str, Any]) -> None:
def register_events(symbols: dict[str, Any]) -> None:
HCI_Event.event_names.update(HCI_Event.event_map(symbols))
@staticmethod
@@ -5710,8 +5710,8 @@ class HCI_Extended_Event(HCI_Event):
HCI_Event subclass for events that have a subevent code.
'''
subevent_names: Dict[int, str] = {}
subevent_classes: Dict[int, Type[HCI_Extended_Event]] = {}
subevent_names: dict[int, str] = {}
subevent_classes: dict[int, type[HCI_Extended_Event]] = {}
@classmethod
def event(cls, fields=()):
@@ -5750,7 +5750,7 @@ class HCI_Extended_Event(HCI_Event):
return f'{cls.__name__.upper()}[0x{subevent_code:02X}]'
@staticmethod
def subevent_map(symbols: Dict[str, Any]) -> Dict[int, str]:
def subevent_map(symbols: dict[str, Any]) -> dict[int, str]:
return {
subevent_code: subevent_name
for (subevent_name, subevent_code) in symbols.items()
@@ -5758,7 +5758,7 @@ class HCI_Extended_Event(HCI_Event):
}
@classmethod
def register_subevents(cls, symbols: Dict[str, Any]) -> None:
def register_subevents(cls, symbols: dict[str, Any]) -> None:
cls.subevent_names.update(cls.subevent_map(symbols))
@classmethod
@@ -5807,7 +5807,7 @@ class HCI_LE_Meta_Event(HCI_Extended_Event):
subevent_classes = {}
@staticmethod
def subevent_map(symbols: Dict[str, Any]) -> Dict[int, str]:
def subevent_map(symbols: dict[str, Any]) -> dict[int, str]:
return {
subevent_code: subevent_name
for (subevent_name, subevent_code) in symbols.items()

View File

@@ -26,14 +26,9 @@ import enum
import traceback
import re
from typing import (
Dict,
List,
Union,
Set,
Any,
Optional,
Type,
Tuple,
ClassVar,
Iterable,
TYPE_CHECKING,
@@ -375,7 +370,7 @@ class CallLineIdentification:
cli_validity: Optional[int] = None
@classmethod
def parse_from(cls: Type[Self], parameters: List[bytes]) -> Self:
def parse_from(cls, parameters: list[bytes]) -> Self:
return cls(
number=parameters[0].decode(),
type=int(parameters[1]),
@@ -505,9 +500,9 @@ STATUS_CODES = {
@dataclasses.dataclass
class HfConfiguration:
supported_hf_features: List[HfFeature]
supported_hf_indicators: List[HfIndicator]
supported_audio_codecs: List[AudioCodec]
supported_hf_features: list[HfFeature]
supported_hf_indicators: list[HfIndicator]
supported_audio_codecs: list[AudioCodec]
@dataclasses.dataclass
@@ -535,7 +530,7 @@ class AtResponse:
parameters: list
@classmethod
def parse_from(cls: Type[Self], buffer: bytearray) -> Self:
def parse_from(cls: type[Self], buffer: bytearray) -> Self:
code_and_parameters = buffer.split(b':')
parameters = (
code_and_parameters[1] if len(code_and_parameters) > 1 else bytearray()
@@ -563,7 +558,7 @@ class AtCommand:
)
@classmethod
def parse_from(cls: Type[Self], buffer: bytearray) -> Self:
def parse_from(cls: type[Self], buffer: bytearray) -> Self:
if not (match := cls._PARSE_PATTERN.fullmatch(buffer.decode())):
if buffer.startswith(b'ATA'):
return cls(code='A', sub_code=AtCommand.SubCode.NONE, parameters=[])
@@ -598,7 +593,7 @@ class AgIndicatorState:
"""
indicator: AgIndicator
supported_values: Set[int]
supported_values: set[int]
current_status: int
index: Optional[int] = None
enabled: bool = True
@@ -616,14 +611,14 @@ class AgIndicatorState:
return f'(\"{self.indicator.value}\",{supported_values_text})'
@classmethod
def call(cls: Type[Self]) -> Self:
def call(cls: type[Self]) -> Self:
"""Default call indicator state."""
return cls(
indicator=AgIndicator.CALL, supported_values={0, 1}, current_status=0
)
@classmethod
def callsetup(cls: Type[Self]) -> Self:
def callsetup(cls: type[Self]) -> Self:
"""Default callsetup indicator state."""
return cls(
indicator=AgIndicator.CALL_SETUP,
@@ -632,7 +627,7 @@ class AgIndicatorState:
)
@classmethod
def callheld(cls: Type[Self]) -> Self:
def callheld(cls: type[Self]) -> Self:
"""Default call indicator state."""
return cls(
indicator=AgIndicator.CALL_HELD,
@@ -641,14 +636,14 @@ class AgIndicatorState:
)
@classmethod
def service(cls: Type[Self]) -> Self:
def service(cls: type[Self]) -> Self:
"""Default service indicator state."""
return cls(
indicator=AgIndicator.SERVICE, supported_values={0, 1}, current_status=0
)
@classmethod
def signal(cls: Type[Self]) -> Self:
def signal(cls: type[Self]) -> Self:
"""Default signal indicator state."""
return cls(
indicator=AgIndicator.SIGNAL,
@@ -657,14 +652,14 @@ class AgIndicatorState:
)
@classmethod
def roam(cls: Type[Self]) -> Self:
def roam(cls: type[Self]) -> Self:
"""Default roam indicator state."""
return cls(
indicator=AgIndicator.CALL, supported_values={0, 1}, current_status=0
)
@classmethod
def battchg(cls: Type[Self]) -> Self:
def battchg(cls: type[Self]) -> Self:
"""Default battery charge indicator state."""
return cls(
indicator=AgIndicator.BATTERY_CHARGE,
@@ -732,13 +727,13 @@ class HfProtocol(utils.EventEmitter):
"""Termination signal for run() loop."""
supported_hf_features: int
supported_audio_codecs: List[AudioCodec]
supported_audio_codecs: list[AudioCodec]
supported_ag_features: int
supported_ag_call_hold_operations: List[CallHoldOperation]
supported_ag_call_hold_operations: list[CallHoldOperation]
ag_indicators: List[AgIndicatorState]
hf_indicators: Dict[HfIndicator, HfIndicatorState]
ag_indicators: list[AgIndicatorState]
hf_indicators: dict[HfIndicator, HfIndicatorState]
dlc: rfcomm.DLC
command_lock: asyncio.Lock
@@ -836,7 +831,7 @@ class HfProtocol(utils.EventEmitter):
cmd: str,
timeout: float = 1.0,
response_type: AtResponseType = AtResponseType.NONE,
) -> Union[None, AtResponse, List[AtResponse]]:
) -> Union[None, AtResponse, list[AtResponse]]:
"""
Sends an AT command and wait for the peer response.
Wait for the AT responses sent by the peer, to the status code.
@@ -853,7 +848,7 @@ class HfProtocol(utils.EventEmitter):
async with self.command_lock:
logger.debug(f">>> {cmd}")
self.dlc.write(cmd + '\r')
responses: List[AtResponse] = []
responses: list[AtResponse] = []
while True:
result = await asyncio.wait_for(
@@ -1073,7 +1068,7 @@ class HfProtocol(utils.EventEmitter):
# code, with the value indicating (call=0).
await self.execute_command("AT+CHUP")
async def query_current_calls(self) -> List[CallInfo]:
async def query_current_calls(self) -> list[CallInfo]:
"""4.32.1 Query List of Current Calls in AG.
Return:
@@ -1204,27 +1199,27 @@ class AgProtocol(utils.EventEmitter):
EVENT_MICROPHONE_VOLUME = "microphone_volume"
supported_hf_features: int
supported_hf_indicators: Set[HfIndicator]
supported_audio_codecs: List[AudioCodec]
supported_hf_indicators: set[HfIndicator]
supported_audio_codecs: list[AudioCodec]
supported_ag_features: int
supported_ag_call_hold_operations: List[CallHoldOperation]
supported_ag_call_hold_operations: list[CallHoldOperation]
ag_indicators: List[AgIndicatorState]
ag_indicators: list[AgIndicatorState]
hf_indicators: collections.OrderedDict[HfIndicator, HfIndicatorState]
dlc: rfcomm.DLC
read_buffer: bytearray
active_codec: AudioCodec
calls: List[CallInfo]
calls: list[CallInfo]
indicator_report_enabled: bool
inband_ringtone_enabled: bool
cme_error_enabled: bool
cli_notification_enabled: bool
call_waiting_enabled: bool
_remained_slc_setup_features: Set[HfFeature]
_remained_slc_setup_features: set[HfFeature]
def __init__(self, dlc: rfcomm.DLC, configuration: AgConfiguration) -> None:
super().__init__()
@@ -1694,7 +1689,7 @@ def make_hf_sdp_records(
rfcomm_channel: int,
configuration: HfConfiguration,
version: ProfileVersion = ProfileVersion.V1_8,
) -> List[sdp.ServiceAttribute]:
) -> list[sdp.ServiceAttribute]:
"""
Generates the SDP record for HFP Hands-Free support.
@@ -1780,7 +1775,7 @@ def make_ag_sdp_records(
rfcomm_channel: int,
configuration: AgConfiguration,
version: ProfileVersion = ProfileVersion.V1_8,
) -> List[sdp.ServiceAttribute]:
) -> list[sdp.ServiceAttribute]:
"""
Generates the SDP record for HFP Audio-Gateway support.
@@ -1860,7 +1855,7 @@ def make_ag_sdp_records(
async def find_hf_sdp_record(
connection: device.Connection,
) -> Optional[Tuple[int, ProfileVersion, HfSdpFeature]]:
) -> Optional[tuple[int, ProfileVersion, HfSdpFeature]]:
"""Searches a Hands-Free SDP record from remote device.
Args:
@@ -1912,7 +1907,7 @@ async def find_hf_sdp_record(
async def find_ag_sdp_record(
connection: device.Connection,
) -> Optional[Tuple[int, ProfileVersion, AgSdpFeature]]:
) -> Optional[tuple[int, ProfileVersion, AgSdpFeature]]:
"""Searches an Audio-Gateway SDP record from remote device.
Args:
@@ -2010,7 +2005,7 @@ class EscoParameters:
transmit_codec_frame_size: int = 60
receive_codec_frame_size: int = 60
def asdict(self) -> Dict[str, Any]:
def asdict(self) -> dict[str, Any]:
# dataclasses.asdict() will recursively deep-copy the entire object,
# which is expensive and breaks CodingFormat object, so let it simply copy here.
return self.__dict__

View File

@@ -26,10 +26,7 @@ from typing import (
Any,
Awaitable,
Callable,
Deque,
Dict,
Optional,
Set,
cast,
TYPE_CHECKING,
)
@@ -89,7 +86,9 @@ class DataPacketQueue(utils.EventEmitter):
int
) # Number of packets in flight per connection
self._send = send
self._packets: Deque[tuple[hci.HCI_Packet, int]] = collections.deque()
self._packets: collections.deque[tuple[hci.HCI_Packet, int]] = (
collections.deque()
)
self._queued = 0
self._completed = 0
@@ -234,16 +233,16 @@ class IsoLink:
# -----------------------------------------------------------------------------
class Host(utils.EventEmitter):
connections: Dict[int, Connection]
cis_links: Dict[int, IsoLink]
bis_links: Dict[int, IsoLink]
sco_links: Dict[int, ScoLink]
connections: dict[int, Connection]
cis_links: dict[int, IsoLink]
bis_links: dict[int, IsoLink]
sco_links: dict[int, ScoLink]
bigs: dict[int, set[int]]
acl_packet_queue: Optional[DataPacketQueue] = None
le_acl_packet_queue: Optional[DataPacketQueue] = None
iso_packet_queue: Optional[DataPacketQueue] = None
hci_sink: Optional[TransportSink] = None
hci_metadata: Dict[str, Any]
hci_metadata: dict[str, Any]
long_term_key_provider: Optional[
Callable[[int, bytes, int], Awaitable[Optional[bytes]]]
]
@@ -813,7 +812,7 @@ class Host(utils.EventEmitter):
) != 0
@property
def supported_commands(self) -> Set[int]:
def supported_commands(self) -> set[int]:
return set(
op_code
for op_code, mask in hci.HCI_SUPPORTED_COMMANDS_MASKS.items()

View File

@@ -26,7 +26,7 @@ import dataclasses
import logging
import os
import json
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Type, Any
from typing import TYPE_CHECKING, Optional, Any
from typing_extensions import Self
from bumble.colors import color
@@ -157,7 +157,7 @@ class KeyStore:
async def get(self, _name: str) -> Optional[PairingKeys]:
return None
async def get_all(self) -> List[Tuple[str, PairingKeys]]:
async def get_all(self) -> list[tuple[str, PairingKeys]]:
return []
async def delete_all(self) -> None:
@@ -272,7 +272,7 @@ class JsonKeyStore(KeyStore):
@classmethod
def from_device(
cls: Type[Self], device: Device, filename: Optional[str] = None
cls: type[Self], device: Device, filename: Optional[str] = None
) -> Self:
if not filename:
# Extract the filename from the config if there is one
@@ -356,7 +356,7 @@ class JsonKeyStore(KeyStore):
# -----------------------------------------------------------------------------
class MemoryKeyStore(KeyStore):
all_keys: Dict[str, PairingKeys]
all_keys: dict[str, PairingKeys]
def __init__(self) -> None:
self.all_keys = {}
@@ -371,5 +371,5 @@ class MemoryKeyStore(KeyStore):
async def get(self, name: str) -> Optional[PairingKeys]:
return self.all_keys.get(name)
async def get_all(self) -> List[Tuple[str, PairingKeys]]:
async def get_all(self) -> list[tuple[str, PairingKeys]]:
return list(self.all_keys.items())

View File

@@ -24,15 +24,10 @@ import struct
from collections import deque
from typing import (
Dict,
Type,
List,
Optional,
Tuple,
Callable,
Any,
Union,
Deque,
Iterable,
SupportsBytes,
TYPE_CHECKING,
@@ -242,7 +237,7 @@ class L2CAP_Control_Frame:
See Bluetooth spec @ Vol 3, Part A - 4 SIGNALING PACKET FORMATS
'''
classes: Dict[int, Type[L2CAP_Control_Frame]] = {}
classes: dict[int, type[L2CAP_Control_Frame]] = {}
code = 0
name: str
@@ -276,7 +271,7 @@ class L2CAP_Control_Frame:
return name_or_number(L2CAP_CONTROL_FRAME_NAMES, code)
@staticmethod
def decode_configuration_options(data: bytes) -> List[Tuple[int, bytes]]:
def decode_configuration_options(data: bytes) -> list[tuple[int, bytes]]:
options = []
while len(data) >= 2:
value_type = data[0]
@@ -288,7 +283,7 @@ class L2CAP_Control_Frame:
return options
@staticmethod
def encode_configuration_options(options: List[Tuple[int, bytes]]) -> bytes:
def encode_configuration_options(options: list[tuple[int, bytes]]) -> bytes:
return b''.join(
[bytes([option[0], len(option[1])]) + option[1] for option in options]
)
@@ -400,7 +395,7 @@ class L2CAP_Connection_Request(L2CAP_Control_Frame):
source_cid: int
@staticmethod
def parse_psm(data: bytes, offset: int = 0) -> Tuple[int, int]:
def parse_psm(data: bytes, offset: int = 0) -> tuple[int, int]:
psm_length = 2
psm = data[offset] | data[offset + 1] << 8
@@ -1041,7 +1036,7 @@ class LeCreditBasedChannel(utils.EventEmitter):
DISCONNECTED = 4
CONNECTION_ERROR = 5
out_queue: Deque[bytes]
out_queue: deque[bytes]
connection_result: Optional[asyncio.Future[LeCreditBasedChannel]]
disconnection_result: Optional[asyncio.Future[None]]
in_sdu: Optional[bytes]
@@ -1445,13 +1440,13 @@ class LeCreditBasedChannelServer(utils.EventEmitter):
# -----------------------------------------------------------------------------
class ChannelManager:
identifiers: Dict[int, int]
channels: Dict[int, Dict[int, Union[ClassicChannel, LeCreditBasedChannel]]]
servers: Dict[int, ClassicChannelServer]
le_coc_channels: Dict[int, Dict[int, LeCreditBasedChannel]]
le_coc_servers: Dict[int, LeCreditBasedChannelServer]
le_coc_requests: Dict[int, L2CAP_LE_Credit_Based_Connection_Request]
fixed_channels: Dict[int, Optional[Callable[[int, bytes], Any]]]
identifiers: dict[int, int]
channels: dict[int, dict[int, Union[ClassicChannel, LeCreditBasedChannel]]]
servers: dict[int, ClassicChannelServer]
le_coc_channels: dict[int, dict[int, LeCreditBasedChannel]]
le_coc_servers: dict[int, LeCreditBasedChannelServer]
le_coc_requests: dict[int, L2CAP_LE_Credit_Based_Connection_Request]
fixed_channels: dict[int, Optional[Callable[[int, bytes], Any]]]
_host: Optional[Host]
connection_parameters_update_response: Optional[asyncio.Future[int]]

View File

@@ -36,7 +36,7 @@ from bumble.hci import (
)
from bumble import controller
from typing import Optional, Set
from typing import Optional
# -----------------------------------------------------------------------------
# Logging
@@ -65,7 +65,7 @@ class LocalLink:
Link bus for controllers to communicate with each other
'''
controllers: Set[controller.Controller]
controllers: set[controller.Controller]
def __init__(self):
self.controllers = set()

View File

@@ -18,7 +18,7 @@
from __future__ import annotations
import enum
from dataclasses import dataclass
from typing import Optional, Tuple
from typing import Optional
from bumble.hci import (
Address,
@@ -205,7 +205,7 @@ class PairingDelegate:
# [LE only]
async def key_distribution_response(
self, peer_initiator_key_distribution: int, peer_responder_key_distribution: int
) -> Tuple[int, int]:
) -> tuple[int, int]:
"""
Return the key distribution response in an SMP protocol context.

View File

@@ -45,7 +45,7 @@ __all__ = [
# Add servicers hooks.
_SERVICERS_HOOKS: List[Callable[[PandoraDevice, Config, grpc.aio.Server], None]] = []
_SERVICERS_HOOKS: list[Callable[[PandoraDevice, Config, grpc.aio.Server], None]] = []
def register_servicer_hook(

View File

@@ -15,7 +15,7 @@
from __future__ import annotations
from bumble.pairing import PairingConfig, PairingDelegate
from dataclasses import dataclass
from typing import Any, Dict
from typing import Any
@dataclass
@@ -32,7 +32,7 @@ class Config:
PairingDelegate.DEFAULT_KEY_DISTRIBUTION
)
def load_from_dict(self, config: Dict[str, Any]) -> None:
def load_from_dict(self, config: dict[str, Any]) -> None:
io_capability_name: str = config.get(
'io_capability', 'no_output_no_input'
).upper()

View File

@@ -32,7 +32,7 @@ from bumble.sdp import (
DataElement,
ServiceAttribute,
)
from typing import Any, Dict, List, Optional
from typing import Any, Optional
# Default rootcanal HCI TCP address
@@ -49,13 +49,13 @@ class PandoraDevice:
# Bumble device instance & configuration.
device: Device
config: Dict[str, Any]
config: dict[str, Any]
# HCI transport name & instance.
_hci_name: str
_hci: Optional[transport.Transport] # type: ignore[name-defined]
def __init__(self, config: Dict[str, Any]) -> None:
def __init__(self, config: dict[str, Any]) -> None:
self.config = config
self.device = _make_device(config)
self._hci_name = config.get(
@@ -95,14 +95,14 @@ class PandoraDevice:
await self.close()
await self.open()
def info(self) -> Optional[Dict[str, str]]:
def info(self) -> Optional[dict[str, str]]:
return {
'public_bd_address': str(self.device.public_address),
'random_address': str(self.device.random_address),
}
def _make_device(config: Dict[str, Any]) -> Device:
def _make_device(config: dict[str, Any]) -> Device:
"""Initialize an idle Bumble device instance."""
# initialize bumble device.
@@ -117,7 +117,7 @@ def _make_device(config: Dict[str, Any]) -> Device:
# TODO(b/267540823): remove when Pandora A2dp is supported
def _make_sdp_records(rfcomm_channel: int) -> Dict[int, List[ServiceAttribute]]:
def _make_sdp_records(rfcomm_channel: int) -> dict[int, list[ServiceAttribute]]:
return {
0x00010001: [
ServiceAttribute(

View File

@@ -73,7 +73,6 @@ from pandora.host_pb2 import (
ConnectResponse,
DataTypes,
DisconnectRequest,
DiscoverabilityMode,
InquiryResponse,
PrimaryPhy,
ReadLocalAddressResponse,
@@ -86,9 +85,9 @@ from pandora.host_pb2 import (
WaitConnectionResponse,
WaitDisconnectionRequest,
)
from typing import AsyncGenerator, Dict, List, Optional, Set, Tuple, cast
from typing import AsyncGenerator, Optional, cast
PRIMARY_PHY_MAP: Dict[int, PrimaryPhy] = {
PRIMARY_PHY_MAP: dict[int, PrimaryPhy] = {
# Default value reported by Bumble for legacy Advertising reports.
# FIXME(uael): `None` might be a better value, but Bumble need to change accordingly.
0: PRIMARY_1M,
@@ -96,26 +95,26 @@ PRIMARY_PHY_MAP: Dict[int, PrimaryPhy] = {
3: PRIMARY_CODED,
}
SECONDARY_PHY_MAP: Dict[int, SecondaryPhy] = {
SECONDARY_PHY_MAP: dict[int, SecondaryPhy] = {
0: SECONDARY_NONE,
1: SECONDARY_1M,
2: SECONDARY_2M,
3: SECONDARY_CODED,
}
PRIMARY_PHY_TO_BUMBLE_PHY_MAP: Dict[PrimaryPhy, Phy] = {
PRIMARY_PHY_TO_BUMBLE_PHY_MAP: dict[PrimaryPhy, Phy] = {
PRIMARY_1M: Phy.LE_1M,
PRIMARY_CODED: Phy.LE_CODED,
}
SECONDARY_PHY_TO_BUMBLE_PHY_MAP: Dict[SecondaryPhy, Phy] = {
SECONDARY_PHY_TO_BUMBLE_PHY_MAP: dict[SecondaryPhy, Phy] = {
SECONDARY_NONE: Phy.LE_1M,
SECONDARY_1M: Phy.LE_1M,
SECONDARY_2M: Phy.LE_2M,
SECONDARY_CODED: Phy.LE_CODED,
}
OWN_ADDRESS_MAP: Dict[host_pb2.OwnAddressType, OwnAddressType] = {
OWN_ADDRESS_MAP: dict[host_pb2.OwnAddressType, OwnAddressType] = {
host_pb2.PUBLIC: OwnAddressType.PUBLIC,
host_pb2.RANDOM: OwnAddressType.RANDOM,
host_pb2.RESOLVABLE_OR_PUBLIC: OwnAddressType.RESOLVABLE_OR_PUBLIC,
@@ -124,7 +123,7 @@ OWN_ADDRESS_MAP: Dict[host_pb2.OwnAddressType, OwnAddressType] = {
class HostService(HostServicer):
waited_connections: Set[int]
waited_connections: set[int]
def __init__(
self, grpc_server: grpc.aio.Server, device: Device, config: Config
@@ -618,7 +617,7 @@ class HostService(HostServicer):
self.log.debug('Inquiry')
inquiry_queue: asyncio.Queue[
Optional[Tuple[Address, int, AdvertisingData, int]]
Optional[tuple[Address, int, AdvertisingData, int]]
] = asyncio.Queue()
complete_handler = self.device.on(
self.device.EVENT_INQUIRY_COMPLETE, lambda: inquiry_queue.put_nowait(None)
@@ -670,10 +669,10 @@ class HostService(HostServicer):
return empty_pb2.Empty()
def unpack_data_types(self, dt: DataTypes) -> AdvertisingData:
ad_structures: List[Tuple[int, bytes]] = []
ad_structures: list[tuple[int, bytes]] = []
uuids: List[str]
datas: Dict[str, bytes]
uuids: list[str]
datas: dict[str, bytes]
def uuid128_from_str(uuid: str) -> bytes:
"""Decode a 128-bit uuid encoded as XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX
@@ -887,50 +886,50 @@ class HostService(HostServicer):
def pack_data_types(self, ad: AdvertisingData) -> DataTypes:
dt = DataTypes()
uuids: List[UUID]
uuids: list[UUID]
s: str
i: int
ij: Tuple[int, int]
uuid_data: Tuple[UUID, bytes]
ij: tuple[int, int]
uuid_data: tuple[UUID, bytes]
data: bytes
if uuids := cast(
List[UUID],
list[UUID],
ad.get(AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS),
):
dt.incomplete_service_class_uuids16.extend(
list(map(lambda x: x.to_hex_str('-'), uuids))
)
if uuids := cast(
List[UUID],
list[UUID],
ad.get(AdvertisingData.COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS),
):
dt.complete_service_class_uuids16.extend(
list(map(lambda x: x.to_hex_str('-'), uuids))
)
if uuids := cast(
List[UUID],
list[UUID],
ad.get(AdvertisingData.INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS),
):
dt.incomplete_service_class_uuids32.extend(
list(map(lambda x: x.to_hex_str('-'), uuids))
)
if uuids := cast(
List[UUID],
list[UUID],
ad.get(AdvertisingData.COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS),
):
dt.complete_service_class_uuids32.extend(
list(map(lambda x: x.to_hex_str('-'), uuids))
)
if uuids := cast(
List[UUID],
list[UUID],
ad.get(AdvertisingData.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS),
):
dt.incomplete_service_class_uuids128.extend(
list(map(lambda x: x.to_hex_str('-'), uuids))
)
if uuids := cast(
List[UUID],
list[UUID],
ad.get(AdvertisingData.COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS),
):
dt.complete_service_class_uuids128.extend(
@@ -945,42 +944,42 @@ class HostService(HostServicer):
if i := cast(int, ad.get(AdvertisingData.CLASS_OF_DEVICE)):
dt.class_of_device = i
if ij := cast(
Tuple[int, int],
tuple[int, int],
ad.get(AdvertisingData.PERIPHERAL_CONNECTION_INTERVAL_RANGE),
):
dt.peripheral_connection_interval_min = ij[0]
dt.peripheral_connection_interval_max = ij[1]
if uuids := cast(
List[UUID],
list[UUID],
ad.get(AdvertisingData.LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS),
):
dt.service_solicitation_uuids16.extend(
list(map(lambda x: x.to_hex_str('-'), uuids))
)
if uuids := cast(
List[UUID],
list[UUID],
ad.get(AdvertisingData.LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS),
):
dt.service_solicitation_uuids32.extend(
list(map(lambda x: x.to_hex_str('-'), uuids))
)
if uuids := cast(
List[UUID],
list[UUID],
ad.get(AdvertisingData.LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS),
):
dt.service_solicitation_uuids128.extend(
list(map(lambda x: x.to_hex_str('-'), uuids))
)
if uuid_data := cast(
Tuple[UUID, bytes], ad.get(AdvertisingData.SERVICE_DATA_16_BIT_UUID)
tuple[UUID, bytes], ad.get(AdvertisingData.SERVICE_DATA_16_BIT_UUID)
):
dt.service_data_uuid16[uuid_data[0].to_hex_str('-')] = uuid_data[1]
if uuid_data := cast(
Tuple[UUID, bytes], ad.get(AdvertisingData.SERVICE_DATA_32_BIT_UUID)
tuple[UUID, bytes], ad.get(AdvertisingData.SERVICE_DATA_32_BIT_UUID)
):
dt.service_data_uuid32[uuid_data[0].to_hex_str('-')] = uuid_data[1]
if uuid_data := cast(
Tuple[UUID, bytes], ad.get(AdvertisingData.SERVICE_DATA_128_BIT_UUID)
tuple[UUID, bytes], ad.get(AdvertisingData.SERVICE_DATA_128_BIT_UUID)
):
dt.service_data_uuid128[uuid_data[0].to_hex_str('-')] = uuid_data[1]
if data := cast(bytes, ad.get(AdvertisingData.PUBLIC_TARGET_ADDRESS, raw=True)):

View File

@@ -51,7 +51,7 @@ from pandora.l2cap_pb2 import ( # pytype: disable=pyi-error
WaitDisconnectionRequest,
WaitDisconnectionResponse,
)
from typing import AsyncGenerator, Dict, Optional, Union
from typing import AsyncGenerator, Optional, Union
from dataclasses import dataclass
L2capChannel = Union[ClassicChannel, LeCreditBasedChannel]
@@ -70,7 +70,7 @@ class L2CAPService(L2CAPServicer):
)
self.device = device
self.config = config
self.channels: Dict[bytes, ChannelContext] = {}
self.channels: dict[bytes, ChannelContext] = {}
def register_event(self, l2cap_channel: L2capChannel) -> ChannelContext:
close_future = asyncio.get_running_loop().create_future()

View File

@@ -57,7 +57,7 @@ from pandora.security_pb2 import (
WaitSecurityRequest,
WaitSecurityResponse,
)
from typing import Any, AsyncGenerator, AsyncIterator, Callable, Dict, Optional, Union
from typing import Any, AsyncGenerator, AsyncIterator, Callable, Optional, Union
class PairingDelegate(BasePairingDelegate):
@@ -457,7 +457,7 @@ class SecurityService(SecurityServicer):
if self.need_pairing(connection, level):
pair_task = asyncio.create_task(connection.pair())
listeners: Dict[str, Callable[..., Union[None, Awaitable[None]]]] = {
listeners: dict[str, Callable[..., Union[None, Awaitable[None]]]] = {
'disconnection': set_failure('connection_died'),
'pairing_failure': set_failure('pairing_failure'),
'connection_authentication_failure': set_failure('authentication_failure'),

View File

@@ -22,9 +22,9 @@ import logging
from bumble.device import Device
from bumble.hci import Address, AddressType
from google.protobuf.message import Message # pytype: disable=pyi-error
from typing import Any, Dict, Generator, MutableMapping, Optional, Tuple
from typing import Any, Generator, MutableMapping, Optional
ADDRESS_TYPES: Dict[str, AddressType] = {
ADDRESS_TYPES: dict[str, AddressType] = {
"public": Address.PUBLIC_DEVICE_ADDRESS,
"random": Address.RANDOM_DEVICE_ADDRESS,
"public_identity": Address.PUBLIC_IDENTITY_ADDRESS,
@@ -43,7 +43,7 @@ class BumbleServerLoggerAdapter(logging.LoggerAdapter): # type: ignore
def process(
self, msg: str, kwargs: MutableMapping[str, Any]
) -> Tuple[str, MutableMapping[str, Any]]:
) -> tuple[str, MutableMapping[str, Any]]:
assert self.extra
service_name = self.extra['service_name']
assert isinstance(service_name, str)

View File

@@ -19,7 +19,7 @@
import enum
import struct
import logging
from typing import List, Optional, Callable, Union, Any
from typing import Optional, Callable, Union, Any
from bumble import l2cap
from bumble import utils
@@ -103,7 +103,7 @@ class AshaService(gatt.TemplateService):
def __init__(
self,
capability: int,
hisyncid: Union[List[int], bytes],
hisyncid: Union[list[int], bytes],
device: Device,
psm: int = 0,
audio_sink: Optional[Callable[[bytes], Any]] = None,

View File

@@ -24,7 +24,6 @@ import enum
import struct
import functools
import logging
from typing import List
from typing_extensions import Self
from bumble import core
@@ -282,7 +281,7 @@ class UnicastServerAdvertisingData:
# -----------------------------------------------------------------------------
def bits_to_channel_counts(data: int) -> List[int]:
def bits_to_channel_counts(data: int) -> list[int]:
pos = 0
counts = []
while data != 0:
@@ -527,7 +526,7 @@ class BasicAudioAnnouncement:
codec_id: hci.CodingFormat
codec_specific_configuration: CodecSpecificConfiguration
metadata: le_audio.Metadata
bis: List[BasicAudioAnnouncement.BIS]
bis: list[BasicAudioAnnouncement.BIS]
def __bytes__(self) -> bytes:
metadata_bytes = bytes(self.metadata)
@@ -545,7 +544,7 @@ class BasicAudioAnnouncement:
)
presentation_delay: int
subgroups: List[BasicAudioAnnouncement.Subgroup]
subgroups: list[BasicAudioAnnouncement.Subgroup]
@classmethod
def from_bytes(cls, data: bytes) -> Self:

View File

@@ -19,7 +19,7 @@
from __future__ import annotations
import enum
import struct
from typing import Optional, Tuple
from typing import Optional
from bumble import core
from bumble import crypto
@@ -228,7 +228,7 @@ class CoordinatedSetIdentificationProxy(gatt_client.ProfileServiceProxy):
):
self.set_member_rank = characteristics[0]
async def read_set_identity_resolving_key(self) -> Tuple[SirkType, bytes]:
async def read_set_identity_resolving_key(self) -> tuple[SirkType, bytes]:
'''Reads SIRK and decrypts if encrypted.'''
response = await self.set_identity_resolving_key.read_value()
if len(response) != SET_IDENTITY_RESOLVING_KEY_LENGTH + 1:

View File

@@ -17,7 +17,7 @@
# Imports
# -----------------------------------------------------------------------------
import struct
from typing import Optional, Tuple
from typing import Optional
from bumble.gatt import (
GATT_DEVICE_INFORMATION_SERVICE,
@@ -60,7 +60,7 @@ class DeviceInformationService(TemplateService):
hardware_revision: Optional[str] = None,
firmware_revision: Optional[str] = None,
software_revision: Optional[str] = None,
system_id: Optional[Tuple[int, int]] = None, # (OUI, Manufacturer ID)
system_id: Optional[tuple[int, int]] = None, # (OUI, Manufacturer ID)
ieee_regulatory_certification_data_list: Optional[bytes] = None,
# TODO: pnp_id
):

View File

@@ -19,7 +19,7 @@
# -----------------------------------------------------------------------------
import logging
import struct
from typing import Optional, Tuple, Union
from typing import Optional, Union
from bumble.core import Appearance
from bumble.gatt import (
@@ -54,7 +54,7 @@ class GenericAccessService(TemplateService):
appearance_characteristic: Characteristic[bytes]
def __init__(
self, device_name: str, appearance: Union[Appearance, Tuple[int, int], int] = 0
self, device_name: str, appearance: Union[Appearance, tuple[int, int], int] = 0
):
if isinstance(appearance, int):
appearance_int = appearance

View File

@@ -20,7 +20,7 @@ import asyncio
import functools
from dataclasses import dataclass, field
import logging
from typing import Any, Dict, List, Optional, Set, Union
from typing import Any, Optional, Union
from bumble import att, gatt, gatt_adapters, gatt_client
from bumble.core import InvalidArgumentError, InvalidStateError
@@ -228,25 +228,25 @@ class HearingAccessService(gatt.TemplateService):
hearing_aid_preset_control_point: gatt.Characteristic[bytes]
active_preset_index_characteristic: gatt.Characteristic[bytes]
active_preset_index: int
active_preset_index_per_device: Dict[Address, int]
active_preset_index_per_device: dict[Address, int]
device: Device
server_features: HearingAidFeatures
preset_records: Dict[int, PresetRecord] # key is the preset index
preset_records: dict[int, PresetRecord] # key is the preset index
read_presets_request_in_progress: bool
other_server_in_binaural_set: Optional[HearingAccessService] = None
preset_changed_operations_history_per_device: Dict[
Address, List[PresetChangedOperation]
preset_changed_operations_history_per_device: dict[
Address, list[PresetChangedOperation]
]
# Keep an updated list of connected client to send notification to
currently_connected_clients: Set[Connection]
currently_connected_clients: set[Connection]
def __init__(
self, device: Device, features: HearingAidFeatures, presets: List[PresetRecord]
self, device: Device, features: HearingAidFeatures, presets: list[PresetRecord]
) -> None:
self.active_preset_index_per_device = {}
self.read_presets_request_in_progress = False
@@ -381,7 +381,7 @@ class HearingAccessService(gatt.TemplateService):
utils.AsyncRunner.spawn(self._read_preset_response(connection, presets))
async def _read_preset_response(
self, connection: Connection, presets: List[PresetRecord]
self, connection: Connection, presets: list[PresetRecord]
):
# If the ATT bearer is terminated before all notifications or indications are sent, then the server shall consider the Read Presets Request operation aborted and shall not either continue or restart the operation when the client reconnects.
try:

View File

@@ -19,7 +19,7 @@ from __future__ import annotations
import dataclasses
import enum
import struct
from typing import Any, List, Type
from typing import Any
from typing_extensions import Self
from bumble.profiles import bap
@@ -108,13 +108,13 @@ class Metadata:
return self.data
@classmethod
def from_bytes(cls: Type[Self], data: bytes) -> Self:
def from_bytes(cls: type[Self], data: bytes) -> Self:
return cls(tag=Metadata.Tag(data[0]), data=data[1:])
def __bytes__(self) -> bytes:
return bytes([len(self.data) + 1, self.tag]) + self.data
entries: List[Entry] = dataclasses.field(default_factory=list)
entries: list[Entry] = dataclasses.field(default_factory=list)
def pretty_print(self, indent: str) -> str:
"""Convenience method to generate a string with one key-value pair per line."""
@@ -140,7 +140,7 @@ class Metadata:
)
@classmethod
def from_bytes(cls: Type[Self], data: bytes) -> Self:
def from_bytes(cls: type[Self], data: bytes) -> Self:
entries = []
offset = 0
length = len(data)

View File

@@ -29,7 +29,7 @@ from bumble import gatt
from bumble import gatt_client
from bumble import utils
from typing import Type, Optional, ClassVar, Dict, TYPE_CHECKING
from typing import Optional, ClassVar, TYPE_CHECKING
from typing_extensions import Self
# -----------------------------------------------------------------------------
@@ -167,7 +167,7 @@ class ObjectId(int):
'''See Media Control Service 4.4.2. Object ID field.'''
@classmethod
def create_from_bytes(cls: Type[Self], data: bytes) -> Self:
def create_from_bytes(cls: type[Self], data: bytes) -> Self:
return cls(int.from_bytes(data, byteorder='little', signed=False))
def __bytes__(self) -> bytes:
@@ -182,7 +182,7 @@ class GroupObjectType:
object_id: ObjectId
@classmethod
def from_bytes(cls: Type[Self], data: bytes) -> Self:
def from_bytes(cls: type[Self], data: bytes) -> Self:
return cls(
object_type=ObjectType(data[0]),
object_id=ObjectId.create_from_bytes(data[1:]),
@@ -310,7 +310,7 @@ class MediaControlServiceProxy(
):
SERVICE_CLASS = MediaControlService
_CHARACTERISTICS: ClassVar[Dict[str, core.UUID]] = {
_CHARACTERISTICS: ClassVar[dict[str, core.UUID]] = {
'media_player_name': gatt.GATT_MEDIA_PLAYER_NAME_CHARACTERISTIC,
'media_player_icon_object_id': gatt.GATT_MEDIA_PLAYER_ICON_OBJECT_ID_CHARACTERISTIC,
'media_player_icon_url': gatt.GATT_MEDIA_PLAYER_ICON_URL_CHARACTERISTIC,

View File

@@ -20,7 +20,7 @@ from __future__ import annotations
import dataclasses
import enum
from typing import Optional, Sequence
from typing import Sequence
from bumble import att
from bumble import utils

View File

@@ -22,7 +22,7 @@ import asyncio
import collections
import dataclasses
import enum
from typing import Callable, Dict, List, Optional, Tuple, Union, TYPE_CHECKING
from typing import Callable, Optional, Union, TYPE_CHECKING
from typing_extensions import Self
@@ -123,7 +123,7 @@ RFCOMM_DYNAMIC_CHANNEL_NUMBER_END = 30
# -----------------------------------------------------------------------------
def make_service_sdp_records(
service_record_handle: int, channel: int, uuid: Optional[UUID] = None
) -> List[sdp.ServiceAttribute]:
) -> list[sdp.ServiceAttribute]:
"""
Create SDP records for an RFComm service given a channel number and an
optional UUID. A Service Class Attribute is included only if the UUID is not None.
@@ -169,7 +169,7 @@ def make_service_sdp_records(
# -----------------------------------------------------------------------------
async def find_rfcomm_channels(connection: Connection) -> Dict[int, List[UUID]]:
async def find_rfcomm_channels(connection: Connection) -> dict[int, list[UUID]]:
"""Searches all RFCOMM channels and their associated UUID from SDP service records.
Args:
@@ -188,7 +188,7 @@ async def find_rfcomm_channels(connection: Connection) -> Dict[int, List[UUID]]:
],
)
for attribute_lists in search_result:
service_classes: List[UUID] = []
service_classes: list[UUID] = []
channel: Optional[int] = None
for attribute in attribute_lists:
# The layout is [[L2CAP_PROTOCOL], [RFCOMM_PROTOCOL, RFCOMM_CHANNEL]].
@@ -275,7 +275,7 @@ class RFCOMM_Frame:
self.fcs = compute_fcs(bytes([self.address, self.control]) + self.length)
@staticmethod
def parse_mcc(data) -> Tuple[int, bool, bytes]:
def parse_mcc(data) -> tuple[int, bool, bytes]:
mcc_type = data[0] >> 2
c_r = bool((data[0] >> 1) & 1)
length = data[1]
@@ -771,8 +771,8 @@ class Multiplexer(utils.EventEmitter):
connection_result: Optional[asyncio.Future]
disconnection_result: Optional[asyncio.Future]
open_result: Optional[asyncio.Future]
acceptor: Optional[Callable[[int], Optional[Tuple[int, int]]]]
dlcs: Dict[int, DLC]
acceptor: Optional[Callable[[int], Optional[tuple[int, int]]]]
dlcs: dict[int, DLC]
def __init__(self, l2cap_channel: l2cap.ClassicChannel, role: Role) -> None:
super().__init__()
@@ -1088,8 +1088,8 @@ class Server(utils.EventEmitter):
) -> None:
super().__init__()
self.device = device
self.acceptors: Dict[int, Callable[[DLC], None]] = {}
self.dlc_configs: Dict[int, Tuple[int, int]] = {}
self.acceptors: dict[int, Callable[[DLC], None]] = {}
self.dlc_configs: dict[int, tuple[int, int]] = {}
# Register ourselves with the L2CAP channel manager
self.l2cap_server = device.create_l2cap_server(
@@ -1144,7 +1144,7 @@ class Server(utils.EventEmitter):
# Notify
self.emit(self.EVENT_START, multiplexer)
def accept_dlc(self, channel_number: int) -> Optional[Tuple[int, int]]:
def accept_dlc(self, channel_number: int) -> Optional[tuple[int, int]]:
return self.dlc_configs.get(channel_number)
def on_dlc(self, dlc: DLC) -> None:

View File

@@ -17,7 +17,6 @@
# -----------------------------------------------------------------------------
from __future__ import annotations
import struct
from typing import List
# -----------------------------------------------------------------------------
@@ -60,7 +59,7 @@ class MediaPacket:
sequence_number: int,
timestamp: int,
ssrc: int,
csrc_list: List[int],
csrc_list: list[int],
payload_type: int,
payload: bytes,
) -> None:

View File

@@ -19,7 +19,7 @@ from __future__ import annotations
import asyncio
import logging
import struct
from typing import Iterable, NewType, Optional, Union, Sequence, Type, TYPE_CHECKING
from typing import Iterable, NewType, Optional, Union, Sequence, TYPE_CHECKING
from typing_extensions import Self
from bumble import core, l2cap
@@ -547,7 +547,7 @@ class SDP_PDU:
SDP_SERVICE_ATTRIBUTE_REQUEST: SDP_SERVICE_ATTRIBUTE_RESPONSE,
SDP_SERVICE_SEARCH_ATTRIBUTE_REQUEST: SDP_SERVICE_SEARCH_ATTRIBUTE_RESPONSE,
}
sdp_pdu_classes: dict[int, Type[SDP_PDU]] = {}
sdp_pdu_classes: dict[int, type[SDP_PDU]] = {}
name = None
pdu_id = 0

View File

@@ -33,11 +33,7 @@ from typing import (
Any,
Awaitable,
Callable,
Dict,
List,
Optional,
Tuple,
Type,
cast,
)
@@ -210,7 +206,7 @@ class SMP_Command:
See Bluetooth spec @ Vol 3, Part H - 3 SECURITY MANAGER PROTOCOL
'''
smp_classes: Dict[int, Type[SMP_Command]] = {}
smp_classes: dict[int, type[SMP_Command]] = {}
fields: Any
code = 0
name = ''
@@ -254,7 +250,7 @@ class SMP_Command:
@staticmethod
def key_distribution_str(value: int) -> str:
key_types: List[str] = []
key_types: list[str] = []
if value & SMP_ENC_KEY_DISTRIBUTION_FLAG:
key_types.append('ENC')
if value & SMP_ID_KEY_DISTRIBUTION_FLAG:
@@ -706,7 +702,7 @@ class Session:
self.peer_identity_resolving_key = None
self.peer_bd_addr: Optional[Address] = None
self.peer_signature_key = None
self.peer_expected_distributions: List[Type[SMP_Command]] = []
self.peer_expected_distributions: list[type[SMP_Command]] = []
self.dh_key = b''
self.confirm_value = None
self.passkey: Optional[int] = None
@@ -814,7 +810,7 @@ class Session:
self.tk = bytes(16)
@property
def pkx(self) -> Tuple[bytes, bytes]:
def pkx(self) -> tuple[bytes, bytes]:
return (self.ecc_key.x[::-1], self.peer_public_key_x)
@property
@@ -826,7 +822,7 @@ class Session:
return self.pkx[0 if self.is_responder else 1]
@property
def nx(self) -> Tuple[bytes, bytes]:
def nx(self) -> tuple[bytes, bytes]:
assert self.peer_random_value
return (self.r, self.peer_random_value)
@@ -1269,7 +1265,7 @@ class Session:
f'{[c.__name__ for c in self.peer_expected_distributions]}'
)
def check_key_distribution(self, command_class: Type[SMP_Command]) -> None:
def check_key_distribution(self, command_class: type[SMP_Command]) -> None:
# First, check that the connection is encrypted
if not self.connection.is_encrypted:
logger.warning(
@@ -1938,9 +1934,9 @@ class Manager(utils.EventEmitter):
'''
device: Device
sessions: Dict[int, Session]
sessions: dict[int, Session]
pairing_config_factory: Callable[[Connection], PairingConfig]
session_proxy: Type[Session]
session_proxy: type[Session]
_ecc_key: Optional[crypto.EccKey]
def __init__(

View File

@@ -22,7 +22,7 @@ import os
import pathlib
import platform
import sys
from typing import Dict, Optional
from typing import Optional
import grpc.aio
@@ -143,7 +143,7 @@ def publish_grpc_port(grpc_port: int, instance_number: int) -> bool:
# -----------------------------------------------------------------------------
async def open_android_netsim_controller_transport(
server_host: Optional[str], server_port: int, options: Dict[str, str]
server_host: Optional[str], server_port: int, options: dict[str, str]
) -> Transport:
if not server_port:
raise TransportSpecError('invalid port')
@@ -288,7 +288,7 @@ async def open_android_netsim_controller_transport(
async def open_android_netsim_host_transport_with_address(
server_host: Optional[str],
server_port: int,
options: Optional[Dict[str, str]] = None,
options: Optional[dict[str, str]] = None,
):
if server_host == '_' or not server_host:
server_host = 'localhost'
@@ -313,7 +313,7 @@ async def open_android_netsim_host_transport_with_address(
# -----------------------------------------------------------------------------
async def open_android_netsim_host_transport_with_channel(
channel, options: Optional[Dict[str, str]] = None
channel, options: Optional[dict[str, str]] = None
):
# Wrapper for I/O operations
class HciDevice:
@@ -451,7 +451,7 @@ async def open_android_netsim_transport(spec: Optional[str]) -> Transport:
port = 0
params_offset = 0
options: Dict[str, str] = {}
options: dict[str, str] = {}
for param in params[params_offset:]:
if '=' not in param:
raise TransportSpecError('invalid parameter, expected <name>=<value>')

View File

@@ -21,7 +21,7 @@ import struct
import asyncio
import logging
import io
from typing import Any, ContextManager, Tuple, Optional, Protocol, Dict
from typing import Any, ContextManager, Optional, Protocol
from bumble import core
from bumble import hci
@@ -38,7 +38,7 @@ logger = logging.getLogger(__name__)
# Information needed to parse HCI packets with a generic parser:
# For each packet type, the info represents:
# (length-size, length-offset, unpack-type)
HCI_PACKET_INFO: Dict[int, Tuple[int, int, str]] = {
HCI_PACKET_INFO: dict[int, tuple[int, int, str]] = {
hci.HCI_COMMAND_PACKET: (1, 2, 'B'),
hci.HCI_ACL_DATA_PACKET: (2, 2, 'H'),
hci.HCI_SYNCHRONOUS_DATA_PACKET: (1, 2, 'B'),
@@ -108,8 +108,8 @@ class PacketParser:
NEED_BODY = 2
sink: Optional[TransportSink]
extended_packet_info: Dict[int, Tuple[int, int, str]]
packet_info: Optional[Tuple[int, int, str]] = None
extended_packet_info: dict[int, tuple[int, int, str]]
packet_info: Optional[tuple[int, int, str]] = None
def __init__(self, sink: Optional[TransportSink] = None) -> None:
self.sink = sink

View File

@@ -23,7 +23,7 @@ import time
import usb.core
import usb.util
from typing import Optional, Set
from typing import Optional
from usb.core import Device as UsbDevice
from usb.core import USBError
from usb.util import CTRL_TYPE_CLASS, CTRL_RECIPIENT_OTHER
@@ -49,7 +49,7 @@ logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Global
# -----------------------------------------------------------------------------
devices_in_use: Set[int] = set()
devices_in_use: set[int] = set()
# -----------------------------------------------------------------------------

View File

@@ -27,11 +27,8 @@ from typing import (
Any,
Awaitable,
Callable,
List,
Optional,
Protocol,
Set,
Tuple,
TypeVar,
Union,
overload,
@@ -156,7 +153,7 @@ class EventWatcher:
```
'''
handlers: List[Tuple[pyee.EventEmitter, str, Callable[..., Any]]]
handlers: list[tuple[pyee.EventEmitter, str, Callable[..., Any]]]
def __init__(self) -> None:
self.handlers = []
@@ -329,7 +326,7 @@ class AsyncRunner:
default_queue = WorkQueue()
# Shared set of running tasks
running_tasks: Set[Awaitable] = set()
running_tasks: set[Awaitable] = set()
@staticmethod
def run_in_task(queue=None):

View File

@@ -16,7 +16,7 @@
# Imports
# -----------------------------------------------------------------------------
import struct
from typing import Dict, Optional, Type
from typing import Optional
from bumble.hci import (
name_or_number,
@@ -283,7 +283,7 @@ class HCI_Dynamic_Audio_Buffer_Command(HCI_Command):
# -----------------------------------------------------------------------------
class HCI_Android_Vendor_Event(HCI_Extended_Event):
event_code: int = HCI_VENDOR_EVENT
subevent_classes: Dict[int, Type[HCI_Extended_Event]] = {}
subevent_classes: dict[int, type[HCI_Extended_Event]] = {}
@classmethod
def subclass_from_parameters(

View File

@@ -19,7 +19,7 @@ import asyncio
import sys
import os
import logging
from typing import Any, Dict
from typing import Any
from bumble.device import Device
from bumble.transport import open_transport_or_link
@@ -36,7 +36,7 @@ from bumble.a2dp import (
SbcMediaCodecInformation,
)
Context: Dict[Any, Any] = {'output': None}
Context: dict[Any, Any] = {'output': None}
# -----------------------------------------------------------------------------

View File

@@ -25,7 +25,6 @@ from bumble.device import Device, Peer
from bumble.transport import open_transport
from bumble.profiles.ancs import (
AncsClient,
AppAttribute,
AppAttributeId,
EventFlags,
EventId,

View File

@@ -25,8 +25,6 @@ from bumble.core import AdvertisingData
from bumble.device import Device
from bumble.hci import (
Address,
OwnAddressType,
HCI_LE_Set_Extended_Advertising_Parameters_Command,
)
from bumble.profiles.cap import CommonAudioServiceService
from bumble.profiles.csip import CoordinatedSetIdentificationService, SirkType

View File

@@ -19,7 +19,6 @@ import asyncio
import sys
import os
import logging
from bumble.colors import color
from bumble.hci import Address
from bumble.device import Device
from bumble.transport import open_transport_or_link

View File

@@ -25,7 +25,7 @@ import os
import random
import struct
import sys
from typing import Any, List, Union
from typing import Any, Union
from bumble.device import Device, Peer
from bumble import transport
@@ -342,7 +342,7 @@ async def server(device: Device) -> None:
byteorder='big',
)
characteristics: List[gatt.Characteristic] = [
characteristics: list[gatt.Characteristic] = [
c1,
c2,
c3,

View File

@@ -22,7 +22,6 @@ import os
from bumble.core import AdvertisingData
from bumble.device import Device
from bumble import att
from bumble.profiles.hap import (
HearingAccessService,
HearingAidFeatures,

View File

@@ -25,8 +25,6 @@ import io
import struct
import secrets
from typing import Dict
from bumble.core import AdvertisingData
from bumble.device import Device
from bumble.hci import (
@@ -73,7 +71,7 @@ def _sink_pac_record() -> PacRecord:
)
file_outputs: Dict[AseStateMachine, io.BufferedWriter] = {}
file_outputs: dict[AseStateMachine, io.BufferedWriter] = {}
# -----------------------------------------------------------------------------

View File

@@ -22,7 +22,6 @@ Invoke tasks
import os
import glob
import shutil
import urllib
from pathlib import Path
from invoke import task, call, Collection
from invoke.exceptions import Exit, UnexpectedExit

View File

@@ -15,7 +15,6 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from enum import IntEnum
from bumble.core import AdvertisingData, Appearance, UUID, get_dict_key_by_value

View File

@@ -17,7 +17,6 @@
# -----------------------------------------------------------------------------
import asyncio
import pytest
import functools
import pytest_asyncio
import logging
import sys

View File

@@ -21,7 +21,7 @@ import os
import pytest
import pytest_asyncio
from typing import Tuple, Optional
from typing import Optional
from .test_utils import TwoDevices
from bumble import core
@@ -194,7 +194,7 @@ async def test_slc_with_minimal_features():
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_slc(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
async def test_slc(hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol]):
hf, ag = hfp_connections
assert hf.supported_ag_features == ag.supported_ag_features
@@ -207,7 +207,7 @@ async def test_slc(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_ag_indicator(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
async def test_ag_indicator(hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol]):
hf, ag = hfp_connections
future = asyncio.get_running_loop().create_future()
@@ -222,7 +222,7 @@ async def test_ag_indicator(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtoco
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_hf_indicator(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
async def test_hf_indicator(hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol]):
hf, ag = hfp_connections
future = asyncio.get_running_loop().create_future()
@@ -237,7 +237,7 @@ async def test_hf_indicator(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtoco
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_codec_negotiation(
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol]
):
hf, ag = hfp_connections
@@ -254,7 +254,7 @@ async def test_codec_negotiation(
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_dial(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
async def test_dial(hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol]):
hf, ag = hfp_connections
NUMBER = 'ATD123456789'
@@ -268,7 +268,7 @@ async def test_dial(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_answer(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
async def test_answer(hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol]):
hf, ag = hfp_connections
future = asyncio.get_running_loop().create_future()
@@ -281,7 +281,7 @@ async def test_answer(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_reject_incoming_call(
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol]
):
hf, ag = hfp_connections
@@ -294,7 +294,7 @@ async def test_reject_incoming_call(
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_terminate_call(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
async def test_terminate_call(hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol]):
hf, ag = hfp_connections
future = asyncio.get_running_loop().create_future()
@@ -307,7 +307,7 @@ async def test_terminate_call(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProto
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_query_calls_without_calls(
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol]
):
hf, ag = hfp_connections
@@ -317,7 +317,7 @@ async def test_query_calls_without_calls(
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_query_calls_with_calls(
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol]
):
hf, ag = hfp_connections
ag.calls.append(
@@ -347,7 +347,7 @@ async def test_query_calls_with_calls(
),
)
async def test_hold_call_without_call_index(
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol],
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol],
operation: hfp.CallHoldOperation,
):
hf, ag = hfp_connections
@@ -369,7 +369,7 @@ async def test_hold_call_without_call_index(
),
)
async def test_hold_call_with_call_index(
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol],
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol],
operation: hfp.CallHoldOperation,
):
hf, ag = hfp_connections
@@ -393,7 +393,7 @@ async def test_hold_call_with_call_index(
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_ring(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
async def test_ring(hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol]):
hf, ag = hfp_connections
ring_future = asyncio.get_running_loop().create_future()
hf.on("ring", lambda: ring_future.set_result(None))
@@ -405,7 +405,7 @@ async def test_ring(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_speaker_volume(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
async def test_speaker_volume(hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol]):
hf, ag = hfp_connections
speaker_volume_future = asyncio.get_running_loop().create_future()
hf.on("speaker_volume", speaker_volume_future.set_result)
@@ -418,7 +418,7 @@ async def test_speaker_volume(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProto
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_microphone_volume(
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol]
):
hf, ag = hfp_connections
microphone_volume_future = asyncio.get_running_loop().create_future()
@@ -431,7 +431,7 @@ async def test_microphone_volume(
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_cli_notification(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
async def test_cli_notification(hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol]):
hf, ag = hfp_connections
cli_notification_future = asyncio.get_running_loop().create_future()
hf.on("cli_notification", cli_notification_future.set_result)
@@ -448,7 +448,7 @@ async def test_cli_notification(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgPro
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_voice_recognition_from_hf(
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol]
):
hf, ag = hfp_connections
voice_recognition_future = asyncio.get_running_loop().create_future()
@@ -462,7 +462,7 @@ async def test_voice_recognition_from_hf(
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_voice_recognition_from_ag(
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol]
):
hf, ag = hfp_connections
voice_recognition_future = asyncio.get_running_loop().create_future()
@@ -572,7 +572,7 @@ async def test_sco_setup():
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_hf_batched_response(
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol]
):
hf, ag = hfp_connections
@@ -584,7 +584,7 @@ async def test_hf_batched_response(
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_ag_batched_commands(
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol]
):
hf, ag = hfp_connections

View File

@@ -17,7 +17,6 @@
# -----------------------------------------------------------------------------
import asyncio
import pytest
from typing import List
from . import test_utils
from bumble import core
@@ -73,7 +72,7 @@ async def test_connection_and_disconnection() -> None:
multiplexer = await Client(devices.connections[1]).start()
dlcs = await asyncio.gather(accept_future, multiplexer.open_dlc(channel))
queues: List[asyncio.Queue] = [asyncio.Queue(), asyncio.Queue()]
queues: list[asyncio.Queue] = [asyncio.Queue(), asyncio.Queue()]
for dlc, queue in zip(dlcs, queues):
dlc.sink = queue.put_nowait

View File

@@ -16,7 +16,7 @@
# Imports
# -----------------------------------------------------------------------------
import asyncio
from typing import List, Optional, Type
from typing import Optional
from typing_extensions import Self
from bumble.controller import Controller
@@ -29,7 +29,7 @@ from bumble.hci import Address
# -----------------------------------------------------------------------------
class TwoDevices:
connections: List[Optional[Connection]]
connections: list[Optional[Connection]]
def __init__(self) -> None:
self.connections = [None, None]
@@ -83,7 +83,7 @@ class TwoDevices:
return self.devices[index]
@classmethod
async def create_with_connection(cls: Type[Self]) -> Self:
async def create_with_connection(cls: type[Self]) -> Self:
devices = cls()
await devices.setup_connection()
return devices

View File

@@ -18,7 +18,6 @@
from __future__ import annotations
import enum
import logging
from typing import Dict, List
from bumble.core import PhysicalTransport, CommandTimeoutError
from bumble.device import Device, DeviceConfiguration
@@ -101,7 +100,7 @@ class Speaker:
self.stream_state = Speaker.StreamState.IDLE
self.audio_extractor = AudioExtractor.create(codec)
def sdp_records(self) -> Dict[int, List[ServiceAttribute]]:
def sdp_records(self) -> dict[int, list[ServiceAttribute]]:
service_record_handle = 0x00010001
return {
service_record_handle: make_audio_sink_service_sdp_records(