Replace deprecated typing aliases

This commit is contained in:
Josh Wu
2025-06-07 23:29:26 +08:00
parent 3a64772cc5
commit 8a0cd5d0d1
68 changed files with 366 additions and 424 deletions

View File

@@ -37,10 +37,7 @@ from typing import (
Any,
Callable,
ClassVar,
Deque,
Dict,
Optional,
Type,
TypeVar,
Union,
cast,
@@ -436,7 +433,7 @@ class AdvertisingEventProperties:
@classmethod
def from_advertising_type(
cls: Type[AdvertisingEventProperties],
cls: type[AdvertisingEventProperties],
advertising_type: AdvertisingType,
) -> AdvertisingEventProperties:
return cls(
@@ -1343,7 +1340,7 @@ class Peer:
return self.gatt_client.get_characteristics_by_uuid(uuid, service)
def create_service_proxy(
self, proxy_class: Type[_PROXY_CLASS]
self, proxy_class: type[_PROXY_CLASS]
) -> Optional[_PROXY_CLASS]:
if proxy := proxy_class.from_client(self.gatt_client):
return cast(_PROXY_CLASS, proxy)
@@ -1351,7 +1348,7 @@ class Peer:
return None
async def discover_service_and_create_proxy(
self, proxy_class: Type[_PROXY_CLASS]
self, proxy_class: type[_PROXY_CLASS]
) -> Optional[_PROXY_CLASS]:
# Discover the first matching service and its characteristics
services = await self.discover_service(proxy_class.SERVICE_CLASS.UUID)
@@ -1547,7 +1544,7 @@ class IsoPacketStream:
self.iso_link = iso_link
self.data_packet_queue = iso_link.data_packet_queue
self.data_packet_queue.on('flow', self._on_flow)
self._thresholds: Deque[int] = collections.deque()
self._thresholds: collections.deque[int] = collections.deque()
self._semaphore = asyncio.Semaphore(max_queue_size)
def _on_flow(self) -> None:
@@ -1956,9 +1953,9 @@ class DeviceConfiguration:
gatt_service_enabled: bool = True
def __post_init__(self) -> None:
self.gatt_services: list[Dict[str, Any]] = []
self.gatt_services: list[dict[str, Any]] = []
def load_from_dict(self, config: Dict[str, Any]) -> None:
def load_from_dict(self, config: dict[str, Any]) -> None:
config = copy.deepcopy(config)
# Load simple properties
@@ -2018,13 +2015,13 @@ class DeviceConfiguration:
self.load_from_dict(json.load(file))
@classmethod
def from_file(cls: Type[Self], filename: str) -> Self:
def from_file(cls: type[Self], filename: str) -> Self:
config = cls()
config.load_from_file(filename)
return config
@classmethod
def from_dict(cls: Type[Self], config: Dict[str, Any]) -> Self:
def from_dict(cls: type[Self], config: dict[str, Any]) -> Self:
device_config = cls()
device_config.load_from_dict(config)
return device_config
@@ -2121,22 +2118,22 @@ class Device(utils.CompositeEventEmitter):
advertising_data: bytes
scan_response_data: bytes
cs_capabilities: ChannelSoundingCapabilities | None = None
connections: Dict[int, Connection]
pending_connections: Dict[hci.Address, Connection]
classic_pending_accepts: Dict[
connections: dict[int, Connection]
pending_connections: dict[hci.Address, Connection]
classic_pending_accepts: dict[
hci.Address,
list[asyncio.Future[Union[Connection, tuple[hci.Address, int, int]]]],
]
advertisement_accumulators: Dict[hci.Address, AdvertisementDataAccumulator]
advertisement_accumulators: dict[hci.Address, AdvertisementDataAccumulator]
periodic_advertising_syncs: list[PeriodicAdvertisingSync]
config: DeviceConfiguration
legacy_advertiser: Optional[LegacyAdvertiser]
sco_links: Dict[int, ScoLink]
cis_links: Dict[int, CisLink]
sco_links: dict[int, ScoLink]
cis_links: dict[int, CisLink]
bigs: dict[int, Big]
bis_links: dict[int, BisLink]
big_syncs: dict[int, BigSync]
_pending_cis: Dict[int, tuple[int, int]]
_pending_cis: dict[int, tuple[int, int]]
gatt_service: gatt_service.GenericAttributeProfileService | None = None
EVENT_ADVERTISEMENT = "advertisement"
@@ -2296,8 +2293,8 @@ class Device(utils.CompositeEventEmitter):
self.address_generation_offload = config.address_generation_offload
# Extended advertising.
self.extended_advertising_sets: Dict[int, AdvertisingSet] = {}
self.connecting_extended_advertising_sets: Dict[int, AdvertisingSet] = {}
self.extended_advertising_sets: dict[int, AdvertisingSet] = {}
self.connecting_extended_advertising_sets: dict[int, AdvertisingSet] = {}
# Legacy advertising.
# The advertising and scan response data, as well as the advertising interval
@@ -4273,11 +4270,11 @@ class Device(utils.CompositeEventEmitter):
self.smp_manager.pairing_config_factory = pairing_config_factory
@property
def smp_session_proxy(self) -> Type[smp.Session]:
def smp_session_proxy(self) -> type[smp.Session]:
return self.smp_manager.session_proxy
@smp_session_proxy.setter
def smp_session_proxy(self, session_proxy: Type[smp.Session]) -> None:
def smp_session_proxy(self, session_proxy: type[smp.Session]) -> None:
self.smp_manager.session_proxy = session_proxy
async def pair(self, connection):