From e54a26393e5f22e29569a69626641133655f8985 Mon Sep 17 00:00:00 2001 From: Josh Wu Date: Wed, 22 Oct 2025 20:52:20 +0800 Subject: [PATCH] AVDTP: Add missing type annotations --- bumble/avdtp.py | 63 +++++++++++++++++++++++++++++++++---------------- 1 file changed, 43 insertions(+), 20 deletions(-) diff --git a/bumble/avdtp.py b/bumble/avdtp.py index d1c1285d..0f7bebe9 100644 --- a/bumble/avdtp.py +++ b/bumble/avdtp.py @@ -35,6 +35,8 @@ from typing import ( cast, ) +from typing_extensions import override + from bumble import a2dp, device, hci, l2cap, sdp, utils from bumble.colors import color from bumble.core import ( @@ -1266,6 +1268,7 @@ class Protocol(utils.EventEmitter): streams: dict[int, Stream] transaction_results: list[Optional[asyncio.Future[Message]]] channel_connector: Callable[[], Awaitable[l2cap.ClassicChannel]] + channel_acceptor: Optional[Stream] EVENT_OPEN = "open" EVENT_CLOSE = "close" @@ -1909,9 +1912,11 @@ class Stream: self.change_state(State.IDLE) - def on_set_configuration_command(self, configuration): + def on_set_configuration_command( + self, configuration: Iterable[ServiceCapabilities] + ) -> Optional[Message]: if self.state != State.IDLE: - return Set_Configuration_Reject(AVDTP_BAD_STATE_ERROR) + return Set_Configuration_Reject(error_code=AVDTP_BAD_STATE_ERROR) result = self.local_endpoint.on_set_configuration_command(configuration) if result is not None: @@ -1920,19 +1925,21 @@ class Stream: self.change_state(State.CONFIGURED) return None - def on_get_configuration_command(self): + def on_get_configuration_command(self) -> Optional[Message]: if self.state not in ( State.CONFIGURED, State.OPEN, State.STREAMING, ): - return Get_Configuration_Reject(AVDTP_BAD_STATE_ERROR) + return Get_Configuration_Reject(error_code=AVDTP_BAD_STATE_ERROR) return self.local_endpoint.on_get_configuration_command() - def on_reconfigure_command(self, configuration): + def on_reconfigure_command( + self, configuration: Iterable[ServiceCapabilities] + ) -> Optional[Message]: if self.state != State.OPEN: - return Reconfigure_Reject(AVDTP_BAD_STATE_ERROR) + return Reconfigure_Reject(error_code=AVDTP_BAD_STATE_ERROR) result = self.local_endpoint.on_reconfigure_command(configuration) if result is not None: @@ -1940,7 +1947,7 @@ class Stream: return None - def on_open_command(self): + def on_open_command(self) -> Optional[Message]: if self.state != State.CONFIGURED: return Open_Reject(AVDTP_BAD_STATE_ERROR) @@ -1954,7 +1961,7 @@ class Stream: self.change_state(State.OPEN) return None - def on_start_command(self): + def on_start_command(self) -> Optional[Message]: if self.state != State.OPEN: return Open_Reject(AVDTP_BAD_STATE_ERROR) @@ -1970,7 +1977,7 @@ class Stream: self.change_state(State.STREAMING) return None - def on_suspend_command(self): + def on_suspend_command(self) -> Optional[Message]: if self.state != State.STREAMING: return Open_Reject(AVDTP_BAD_STATE_ERROR) @@ -1981,7 +1988,7 @@ class Stream: self.change_state(State.OPEN) return None - def on_close_command(self): + def on_close_command(self) -> Optional[Message]: if self.state not in (State.OPEN, State.STREAMING): return Open_Reject(AVDTP_BAD_STATE_ERROR) @@ -2000,13 +2007,14 @@ class Stream: return None - def on_abort_command(self): + def on_abort_command(self) -> Optional[Message]: if self.rtp_channel is None: # No need to wait self.change_state(State.IDLE) else: # Wait for the RTP channel to be closed self.change_state(State.ABORTING) + return None def on_l2cap_connection(self, channel: l2cap.ClassicChannel) -> None: logger.debug(color('<<< stream channel connected', 'magenta')) @@ -2151,10 +2159,15 @@ class LocalStreamEndPoint(StreamEndPoint, utils.EventEmitter): async def close(self) -> None: """[Source Only] Handles when receiving close command.""" - def on_reconfigure_command(self, command) -> Optional[Message]: + def on_reconfigure_command( + self, command: Iterable[ServiceCapabilities] + ) -> Optional[Message]: + del command # unused. return None - def on_set_configuration_command(self, configuration) -> Optional[Message]: + def on_set_configuration_command( + self, configuration: Iterable[ServiceCapabilities] + ) -> Optional[Message]: logger.debug( '<<< received configuration: ' f'{",".join([str(capability) for capability in configuration])}' @@ -2210,13 +2223,13 @@ class LocalSource(LocalStreamEndPoint): protocol: Protocol, seid: int, codec_capabilities: MediaCodecCapabilities, - other_capabilitiles: Iterable[ServiceCapabilities], + other_capabilities: Iterable[ServiceCapabilities], packet_pump: MediaPacketPump, ) -> None: capabilities = [ ServiceCapabilities(AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY), codec_capabilities, - ] + list(other_capabilitiles) + ] + list(other_capabilities) super().__init__( protocol, seid, @@ -2227,23 +2240,29 @@ class LocalSource(LocalStreamEndPoint): ) self.packet_pump = packet_pump + @override async def start(self) -> None: if self.packet_pump and self.stream and self.stream.rtp_channel: return await self.packet_pump.start(self.stream.rtp_channel) self.emit(self.EVENT_START) + @override async def stop(self) -> None: if self.packet_pump: return await self.packet_pump.stop() self.emit(self.EVENT_STOP) - def on_start_command(self): + @override + def on_start_command(self) -> Optional[Message]: asyncio.create_task(self.start()) + return None - def on_suspend_command(self): + @override + def on_suspend_command(self) -> Optional[Message]: asyncio.create_task(self.stop()) + return None # ----------------------------------------------------------------------------- @@ -2263,16 +2282,20 @@ class LocalSink(LocalStreamEndPoint): capabilities, ) - def on_rtp_channel_open(self): + def on_rtp_channel_open(self) -> None: logger.debug(color('<<< RTP channel open', 'magenta')) + if not self.stream: + raise InvalidStateError('Stream is None') + if not self.stream.rtp_channel: + raise InvalidStateError('RTP channel is None') self.stream.rtp_channel.sink = self.on_avdtp_packet super().on_rtp_channel_open() - def on_rtp_channel_close(self): + def on_rtp_channel_close(self) -> None: logger.debug(color('<<< RTP channel close', 'magenta')) super().on_rtp_channel_close() - def on_avdtp_packet(self, packet): + def on_avdtp_packet(self, packet: bytes) -> None: rtp_packet = MediaPacket.from_bytes(packet) logger.debug( f'{color("<<< RTP Packet:", "green")} '