From eef5304a36a0289a3fb197a57d7761ea80ce5ae6 Mon Sep 17 00:00:00 2001 From: Josh Wu Date: Fri, 19 Sep 2025 14:39:34 +0800 Subject: [PATCH] AVCTP: Change callback packet type to bytes --- bumble/avctp.py | 20 ++++++++------------ bumble/avrcp.py | 38 +++++++++++++++++++------------------- 2 files changed, 27 insertions(+), 31 deletions(-) diff --git a/bumble/avctp.py b/bumble/avctp.py index 7046785e..690e7a79 100644 --- a/bumble/avctp.py +++ b/bumble/avctp.py @@ -19,10 +19,11 @@ from __future__ import annotations import logging import struct +from collections.abc import Callable from enum import IntEnum -from typing import Callable, Optional, cast +from typing import Optional -from bumble import avc, core, l2cap +from bumble import core, l2cap from bumble.colors import color # ----------------------------------------------------------------------------- @@ -144,9 +145,9 @@ class MessageAssembler: # ----------------------------------------------------------------------------- class Protocol: - CommandHandler = Callable[[int, avc.CommandFrame], None] + CommandHandler = Callable[[int, bytes], None] command_handlers: dict[int, CommandHandler] # Command handlers, by PID - ResponseHandler = Callable[[int, Optional[avc.ResponseFrame]], None] + ResponseHandler = Callable[[int, Optional[bytes]], None] response_handlers: dict[int, ResponseHandler] # Response handlers, by PID next_transaction_label: int message_assembler: MessageAssembler @@ -204,20 +205,15 @@ class Protocol: self.send_ipid(transaction_label, pid) return - command_frame = cast(avc.CommandFrame, avc.Frame.from_bytes(payload)) - self.command_handlers[pid](transaction_label, command_frame) + self.command_handlers[pid](transaction_label, payload) else: if pid not in self.response_handlers: logger.warning(f"no response handler for PID {pid}") return # By convention, for an ipid, send a None payload to the response handler. - if ipid: - response_frame = None - else: - response_frame = cast(avc.ResponseFrame, avc.Frame.from_bytes(payload)) - - self.response_handlers[pid](transaction_label, response_frame) + response_payload = None if ipid else payload + self.response_handlers[pid](transaction_label, response_payload) def send_message( self, diff --git a/bumble/avrcp.py b/bumble/avrcp.py index 71b24f8e..52bd7b2c 100644 --- a/bumble/avrcp.py +++ b/bumble/avrcp.py @@ -22,21 +22,9 @@ import enum import functools import logging import struct +from collections.abc import AsyncIterator, Awaitable, Callable, Iterable, Sequence from dataclasses import dataclass, field -from typing import ( - AsyncIterator, - Awaitable, - Callable, - ClassVar, - Iterable, - List, - Optional, - Sequence, - SupportsBytes, - TypeVar, - Union, - cast, -) +from typing import ClassVar, Optional, SupportsBytes, TypeVar, Union from bumble import avc, avctp, core, hci, l2cap, utils from bumble.colors import color @@ -1762,7 +1750,11 @@ class Protocol(utils.EventEmitter): ), ) response = self._check_response(response_context, GetCapabilitiesResponse) - return cast(List[EventId], response.capabilities) + return list( + capability + for capability in response.capabilities + if isinstance(capability, EventId) + ) async def get_play_status(self) -> SongAndPlayStatus: """Get the play status of the connected peer.""" @@ -2012,9 +2004,12 @@ class Protocol(utils.EventEmitter): self.emit(self.EVENT_STOP) - def _on_avctp_command( - self, transaction_label: int, command: avc.CommandFrame - ) -> None: + def _on_avctp_command(self, transaction_label: int, payload: bytes) -> None: + command = avc.CommandFrame.from_bytes(payload) + if not isinstance(command, avc.CommandFrame): + raise core.InvalidPacketError( + f"{command} is not a valid AV/C Command Frame" + ) logger.debug( f"<<< AVCTP Command, transaction_label={transaction_label}: " f"{command}" ) @@ -2073,8 +2068,13 @@ class Protocol(utils.EventEmitter): self.send_not_implemented_response(transaction_label, command) def _on_avctp_response( - self, transaction_label: int, response: Optional[avc.ResponseFrame] + self, transaction_label: int, payload: Optional[bytes] ) -> None: + response = avc.ResponseFrame.from_bytes(payload) if payload else None + if not isinstance(response, avc.ResponseFrame): + raise core.InvalidPacketError( + f"{response} is not a valid AV/C Response Frame" + ) logger.debug( f"<<< AVCTP Response, transaction_label={transaction_label}: {response}" )