AVDTP: Add missing type annotations

This commit is contained in:
Josh Wu
2025-10-22 20:52:20 +08:00
parent 5dc76cf7b4
commit e54a26393e

View File

@@ -35,6 +35,8 @@ from typing import (
cast,
)
from typing_extensions import override
from bumble import a2dp, device, hci, l2cap, sdp, utils
from bumble.colors import color
from bumble.core import (
@@ -1266,6 +1268,7 @@ class Protocol(utils.EventEmitter):
streams: dict[int, Stream]
transaction_results: list[Optional[asyncio.Future[Message]]]
channel_connector: Callable[[], Awaitable[l2cap.ClassicChannel]]
channel_acceptor: Optional[Stream]
EVENT_OPEN = "open"
EVENT_CLOSE = "close"
@@ -1909,9 +1912,11 @@ class Stream:
self.change_state(State.IDLE)
def on_set_configuration_command(self, configuration):
def on_set_configuration_command(
self, configuration: Iterable[ServiceCapabilities]
) -> Optional[Message]:
if self.state != State.IDLE:
return Set_Configuration_Reject(AVDTP_BAD_STATE_ERROR)
return Set_Configuration_Reject(error_code=AVDTP_BAD_STATE_ERROR)
result = self.local_endpoint.on_set_configuration_command(configuration)
if result is not None:
@@ -1920,19 +1925,21 @@ class Stream:
self.change_state(State.CONFIGURED)
return None
def on_get_configuration_command(self):
def on_get_configuration_command(self) -> Optional[Message]:
if self.state not in (
State.CONFIGURED,
State.OPEN,
State.STREAMING,
):
return Get_Configuration_Reject(AVDTP_BAD_STATE_ERROR)
return Get_Configuration_Reject(error_code=AVDTP_BAD_STATE_ERROR)
return self.local_endpoint.on_get_configuration_command()
def on_reconfigure_command(self, configuration):
def on_reconfigure_command(
self, configuration: Iterable[ServiceCapabilities]
) -> Optional[Message]:
if self.state != State.OPEN:
return Reconfigure_Reject(AVDTP_BAD_STATE_ERROR)
return Reconfigure_Reject(error_code=AVDTP_BAD_STATE_ERROR)
result = self.local_endpoint.on_reconfigure_command(configuration)
if result is not None:
@@ -1940,7 +1947,7 @@ class Stream:
return None
def on_open_command(self):
def on_open_command(self) -> Optional[Message]:
if self.state != State.CONFIGURED:
return Open_Reject(AVDTP_BAD_STATE_ERROR)
@@ -1954,7 +1961,7 @@ class Stream:
self.change_state(State.OPEN)
return None
def on_start_command(self):
def on_start_command(self) -> Optional[Message]:
if self.state != State.OPEN:
return Open_Reject(AVDTP_BAD_STATE_ERROR)
@@ -1970,7 +1977,7 @@ class Stream:
self.change_state(State.STREAMING)
return None
def on_suspend_command(self):
def on_suspend_command(self) -> Optional[Message]:
if self.state != State.STREAMING:
return Open_Reject(AVDTP_BAD_STATE_ERROR)
@@ -1981,7 +1988,7 @@ class Stream:
self.change_state(State.OPEN)
return None
def on_close_command(self):
def on_close_command(self) -> Optional[Message]:
if self.state not in (State.OPEN, State.STREAMING):
return Open_Reject(AVDTP_BAD_STATE_ERROR)
@@ -2000,13 +2007,14 @@ class Stream:
return None
def on_abort_command(self):
def on_abort_command(self) -> Optional[Message]:
if self.rtp_channel is None:
# No need to wait
self.change_state(State.IDLE)
else:
# Wait for the RTP channel to be closed
self.change_state(State.ABORTING)
return None
def on_l2cap_connection(self, channel: l2cap.ClassicChannel) -> None:
logger.debug(color('<<< stream channel connected', 'magenta'))
@@ -2151,10 +2159,15 @@ class LocalStreamEndPoint(StreamEndPoint, utils.EventEmitter):
async def close(self) -> None:
"""[Source Only] Handles when receiving close command."""
def on_reconfigure_command(self, command) -> Optional[Message]:
def on_reconfigure_command(
self, command: Iterable[ServiceCapabilities]
) -> Optional[Message]:
del command # unused.
return None
def on_set_configuration_command(self, configuration) -> Optional[Message]:
def on_set_configuration_command(
self, configuration: Iterable[ServiceCapabilities]
) -> Optional[Message]:
logger.debug(
'<<< received configuration: '
f'{",".join([str(capability) for capability in configuration])}'
@@ -2210,13 +2223,13 @@ class LocalSource(LocalStreamEndPoint):
protocol: Protocol,
seid: int,
codec_capabilities: MediaCodecCapabilities,
other_capabilitiles: Iterable[ServiceCapabilities],
other_capabilities: Iterable[ServiceCapabilities],
packet_pump: MediaPacketPump,
) -> None:
capabilities = [
ServiceCapabilities(AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY),
codec_capabilities,
] + list(other_capabilitiles)
] + list(other_capabilities)
super().__init__(
protocol,
seid,
@@ -2227,23 +2240,29 @@ class LocalSource(LocalStreamEndPoint):
)
self.packet_pump = packet_pump
@override
async def start(self) -> None:
if self.packet_pump and self.stream and self.stream.rtp_channel:
return await self.packet_pump.start(self.stream.rtp_channel)
self.emit(self.EVENT_START)
@override
async def stop(self) -> None:
if self.packet_pump:
return await self.packet_pump.stop()
self.emit(self.EVENT_STOP)
def on_start_command(self):
@override
def on_start_command(self) -> Optional[Message]:
asyncio.create_task(self.start())
return None
def on_suspend_command(self):
@override
def on_suspend_command(self) -> Optional[Message]:
asyncio.create_task(self.stop())
return None
# -----------------------------------------------------------------------------
@@ -2263,16 +2282,20 @@ class LocalSink(LocalStreamEndPoint):
capabilities,
)
def on_rtp_channel_open(self):
def on_rtp_channel_open(self) -> None:
logger.debug(color('<<< RTP channel open', 'magenta'))
if not self.stream:
raise InvalidStateError('Stream is None')
if not self.stream.rtp_channel:
raise InvalidStateError('RTP channel is None')
self.stream.rtp_channel.sink = self.on_avdtp_packet
super().on_rtp_channel_open()
def on_rtp_channel_close(self):
def on_rtp_channel_close(self) -> None:
logger.debug(color('<<< RTP channel close', 'magenta'))
super().on_rtp_channel_close()
def on_avdtp_packet(self, packet):
def on_avdtp_packet(self, packet: bytes) -> None:
rtp_packet = MediaPacket.from_bytes(packet)
logger.debug(
f'{color("<<< RTP Packet:", "green")} '