diff --git a/bumble/att.py b/bumble/att.py index 235d907..6bac58d 100644 --- a/bumble/att.py +++ b/bumble/att.py @@ -836,6 +836,9 @@ class Attribute(utils.EventEmitter, Generic[_T]): READ_REQUIRES_AUTHORIZATION = Permissions.READ_REQUIRES_AUTHORIZATION WRITE_REQUIRES_AUTHORIZATION = Permissions.WRITE_REQUIRES_AUTHORIZATION + EVENT_READ = "read" + EVENT_WRITE = "write" + value: Union[AttributeValue[_T], _T, None] def __init__( @@ -906,7 +909,7 @@ class Attribute(utils.EventEmitter, Generic[_T]): else: value = self.value - self.emit('read', connection, b'' if value is None else value) + self.emit(self.EVENT_READ, connection, b'' if value is None else value) return b'' if value is None else self.encode_value(value) @@ -947,7 +950,7 @@ class Attribute(utils.EventEmitter, Generic[_T]): else: self.value = decoded_value - self.emit('write', connection, decoded_value) + self.emit(self.EVENT_WRITE, connection, decoded_value) def __repr__(self): if isinstance(self.value, bytes): diff --git a/bumble/avctp.py b/bumble/avctp.py index 6d70256..c59d258 100644 --- a/bumble/avctp.py +++ b/bumble/avctp.py @@ -166,8 +166,8 @@ class Protocol: # Register to receive PDUs from the channel l2cap_channel.sink = self.on_pdu - l2cap_channel.on("open", self.on_l2cap_channel_open) - l2cap_channel.on("close", self.on_l2cap_channel_close) + l2cap_channel.on(l2cap_channel.EVENT_OPEN, self.on_l2cap_channel_open) + l2cap_channel.on(l2cap_channel.EVENT_CLOSE, self.on_l2cap_channel_close) def on_l2cap_channel_open(self): logger.debug(color("<<< AVCTP channel open", "magenta")) diff --git a/bumble/avdtp.py b/bumble/avdtp.py index b4bb5fd..2593366 100644 --- a/bumble/avdtp.py +++ b/bumble/avdtp.py @@ -896,7 +896,7 @@ class Set_Configuration_Reject(Message): self.service_category = self.payload[0] self.error_code = self.payload[1] - def __init__(self, service_category, error_code): + def __init__(self, error_code: int, service_category: int = 0) -> None: super().__init__(payload=bytes([service_category, error_code])) self.service_category = service_category self.error_code = error_code @@ -1132,6 +1132,14 @@ class Security_Control_Command(Message): See Bluetooth AVDTP spec - 8.17.1 Security Control Command ''' + def init_from_payload(self): + # pylint: disable=attribute-defined-outside-init + self.acp_seid = self.payload[0] >> 2 + self.data = self.payload[1:] + + def __str__(self) -> str: + return self.to_string([f'ACP_SEID: {self.acp_seid}', f'data: {self.data}']) + # ----------------------------------------------------------------------------- @Message.subclass @@ -1200,6 +1208,9 @@ class Protocol(utils.EventEmitter): transaction_results: List[Optional[asyncio.Future[Message]]] channel_connector: Callable[[], Awaitable[l2cap.ClassicChannel]] + EVENT_OPEN = "open" + EVENT_CLOSE = "close" + class PacketType(enum.IntEnum): SINGLE_PACKET = 0 START_PACKET = 1 @@ -1239,8 +1250,8 @@ class Protocol(utils.EventEmitter): # Register to receive PDUs from the channel l2cap_channel.sink = self.on_pdu - l2cap_channel.on('open', self.on_l2cap_channel_open) - l2cap_channel.on('close', self.on_l2cap_channel_close) + l2cap_channel.on(l2cap_channel.EVENT_OPEN, self.on_l2cap_channel_open) + l2cap_channel.on(l2cap_channel.EVENT_CLOSE, self.on_l2cap_channel_close) def get_local_endpoint_by_seid(self, seid: int) -> Optional[LocalStreamEndPoint]: if 0 < seid <= len(self.local_endpoints): @@ -1410,20 +1421,20 @@ class Protocol(utils.EventEmitter): self.transaction_results[transaction_label] = None self.transaction_semaphore.release() - def on_l2cap_connection(self, channel): + def on_l2cap_connection(self, channel: l2cap.ClassicChannel) -> None: # Forward the channel to the endpoint that's expecting it if self.channel_acceptor is None: logger.warning(color('!!! l2cap connection with no acceptor', 'red')) return self.channel_acceptor.on_l2cap_connection(channel) - def on_l2cap_channel_open(self): + def on_l2cap_channel_open(self) -> None: logger.debug(color('<<< L2CAP channel open', 'magenta')) - self.emit('open') + self.emit(self.EVENT_OPEN) - def on_l2cap_channel_close(self): + def on_l2cap_channel_close(self) -> None: logger.debug(color('<<< L2CAP channel close', 'magenta')) - self.emit('close') + self.emit(self.EVENT_CLOSE) def send_message(self, transaction_label: int, message: Message) -> None: logger.debug( @@ -1541,28 +1552,34 @@ class Protocol(utils.EventEmitter): async def abort(self, seid: int) -> Abort_Response: return await self.send_command(Abort_Command(seid)) - def on_discover_command(self, _command): + def on_discover_command(self, command: Discover_Command) -> Optional[Message]: endpoint_infos = [ EndPointInfo(endpoint.seid, 0, endpoint.media_type, endpoint.tsep) for endpoint in self.local_endpoints ] return Discover_Response(endpoint_infos) - def on_get_capabilities_command(self, command): + def on_get_capabilities_command( + self, command: Get_Capabilities_Command + ) -> Optional[Message]: endpoint = self.get_local_endpoint_by_seid(command.acp_seid) if endpoint is None: return Get_Capabilities_Reject(AVDTP_BAD_ACP_SEID_ERROR) return Get_Capabilities_Response(endpoint.capabilities) - def on_get_all_capabilities_command(self, command): + def on_get_all_capabilities_command( + self, command: Get_All_Capabilities_Command + ) -> Optional[Message]: endpoint = self.get_local_endpoint_by_seid(command.acp_seid) if endpoint is None: return Get_All_Capabilities_Reject(AVDTP_BAD_ACP_SEID_ERROR) return Get_All_Capabilities_Response(endpoint.capabilities) - def on_set_configuration_command(self, command): + def on_set_configuration_command( + self, command: Set_Configuration_Command + ) -> Optional[Message]: endpoint = self.get_local_endpoint_by_seid(command.acp_seid) if endpoint is None: return Set_Configuration_Reject(AVDTP_BAD_ACP_SEID_ERROR) @@ -1578,7 +1595,9 @@ class Protocol(utils.EventEmitter): result = stream.on_set_configuration_command(command.capabilities) return result or Set_Configuration_Response() - def on_get_configuration_command(self, command): + def on_get_configuration_command( + self, command: Get_Configuration_Command + ) -> Optional[Message]: endpoint = self.get_local_endpoint_by_seid(command.acp_seid) if endpoint is None: return Get_Configuration_Reject(AVDTP_BAD_ACP_SEID_ERROR) @@ -1587,7 +1606,7 @@ class Protocol(utils.EventEmitter): return endpoint.stream.on_get_configuration_command() - def on_reconfigure_command(self, command): + def on_reconfigure_command(self, command: Reconfigure_Command) -> Optional[Message]: endpoint = self.get_local_endpoint_by_seid(command.acp_seid) if endpoint is None: return Reconfigure_Reject(0, AVDTP_BAD_ACP_SEID_ERROR) @@ -1597,7 +1616,7 @@ class Protocol(utils.EventEmitter): result = endpoint.stream.on_reconfigure_command(command.capabilities) return result or Reconfigure_Response() - def on_open_command(self, command): + def on_open_command(self, command: Open_Command) -> Optional[Message]: endpoint = self.get_local_endpoint_by_seid(command.acp_seid) if endpoint is None: return Open_Reject(AVDTP_BAD_ACP_SEID_ERROR) @@ -1607,25 +1626,26 @@ class Protocol(utils.EventEmitter): result = endpoint.stream.on_open_command() return result or Open_Response() - def on_start_command(self, command): + def on_start_command(self, command: Start_Command) -> Optional[Message]: for seid in command.acp_seids: endpoint = self.get_local_endpoint_by_seid(seid) if endpoint is None: return Start_Reject(seid, AVDTP_BAD_ACP_SEID_ERROR) if endpoint.stream is None: - return Start_Reject(AVDTP_BAD_STATE_ERROR) + return Start_Reject(seid, AVDTP_BAD_STATE_ERROR) # Start all streams # TODO: deal with partial failures for seid in command.acp_seids: endpoint = self.get_local_endpoint_by_seid(seid) - result = endpoint.stream.on_start_command() - if result is not None: + if not endpoint or not endpoint.stream: + raise InvalidStateError("Should already be checked!") + if (result := endpoint.stream.on_start_command()) is not None: return result return Start_Response() - def on_suspend_command(self, command): + def on_suspend_command(self, command: Suspend_Command) -> Optional[Message]: for seid in command.acp_seids: endpoint = self.get_local_endpoint_by_seid(seid) if endpoint is None: @@ -1637,13 +1657,14 @@ class Protocol(utils.EventEmitter): # TODO: deal with partial failures for seid in command.acp_seids: endpoint = self.get_local_endpoint_by_seid(seid) - result = endpoint.stream.on_suspend_command() - if result is not None: + if not endpoint or not endpoint.stream: + raise InvalidStateError("Should already be checked!") + if (result := endpoint.stream.on_suspend_command()) is not None: return result return Suspend_Response() - def on_close_command(self, command): + def on_close_command(self, command: Close_Command) -> Optional[Message]: endpoint = self.get_local_endpoint_by_seid(command.acp_seid) if endpoint is None: return Close_Reject(AVDTP_BAD_ACP_SEID_ERROR) @@ -1653,7 +1674,7 @@ class Protocol(utils.EventEmitter): result = endpoint.stream.on_close_command() return result or Close_Response() - def on_abort_command(self, command): + def on_abort_command(self, command: Abort_Command) -> Optional[Message]: endpoint = self.get_local_endpoint_by_seid(command.acp_seid) if endpoint is None or endpoint.stream is None: return Abort_Response() @@ -1661,15 +1682,17 @@ class Protocol(utils.EventEmitter): endpoint.stream.on_abort_command() return Abort_Response() - def on_security_control_command(self, command): + def on_security_control_command( + self, command: Security_Control_Command + ) -> Optional[Message]: endpoint = self.get_local_endpoint_by_seid(command.acp_seid) if endpoint is None: return Security_Control_Reject(AVDTP_BAD_ACP_SEID_ERROR) - result = endpoint.on_security_control_command(command.payload) + result = endpoint.on_security_control_command(command.data) return result or Security_Control_Response() - def on_delayreport_command(self, command): + def on_delayreport_command(self, command: DelayReport_Command) -> Optional[Message]: endpoint = self.get_local_endpoint_by_seid(command.acp_seid) if endpoint is None: return DelayReport_Reject(AVDTP_BAD_ACP_SEID_ERROR) @@ -1682,6 +1705,8 @@ class Protocol(utils.EventEmitter): class Listener(utils.EventEmitter): servers: Dict[int, Protocol] + EVENT_CONNECTION = "connection" + @staticmethod def create_registrar(device: device.Device): warnings.warn("Please use Listener.for_device()", DeprecationWarning) @@ -1716,7 +1741,7 @@ class Listener(utils.EventEmitter): l2cap_server = device.create_l2cap_server( spec=l2cap.ClassicChannelSpec(psm=AVDTP_PSM) ) - l2cap_server.on('connection', listener.on_l2cap_connection) + l2cap_server.on(l2cap_server.EVENT_CONNECTION, listener.on_l2cap_connection) return listener def on_l2cap_connection(self, channel: l2cap.ClassicChannel) -> None: @@ -1732,14 +1757,14 @@ class Listener(utils.EventEmitter): logger.debug('setting up new Protocol for the connection') server = Protocol(channel, self.version) self.set_server(channel.connection, server) - self.emit('connection', server) + self.emit(self.EVENT_CONNECTION, server) def on_channel_close(): logger.debug('removing Protocol for the connection') self.remove_server(channel.connection) - channel.on('open', on_channel_open) - channel.on('close', on_channel_close) + channel.on(channel.EVENT_OPEN, on_channel_open) + channel.on(channel.EVENT_CLOSE, on_channel_close) # ----------------------------------------------------------------------------- @@ -1788,6 +1813,7 @@ class Stream: ) async def start(self) -> None: + """[Source] Start streaming.""" # Auto-open if needed if self.state == AVDTP_CONFIGURED_STATE: await self.open() @@ -1804,6 +1830,7 @@ class Stream: self.change_state(AVDTP_STREAMING_STATE) async def stop(self) -> None: + """[Source] Stop streaming and transit to OPEN state.""" if self.state != AVDTP_STREAMING_STATE: raise InvalidStateError('current state is not STREAMING') @@ -1816,6 +1843,7 @@ class Stream: self.change_state(AVDTP_OPEN_STATE) async def close(self) -> None: + """[Source] Close channel and transit to IDLE state.""" if self.state not in (AVDTP_OPEN_STATE, AVDTP_STREAMING_STATE): raise InvalidStateError('current state is not OPEN or STREAMING') @@ -1847,7 +1875,7 @@ class Stream: self.change_state(AVDTP_CONFIGURED_STATE) return None - def on_get_configuration_command(self, configuration): + def on_get_configuration_command(self): if self.state not in ( AVDTP_CONFIGURED_STATE, AVDTP_OPEN_STATE, @@ -1855,7 +1883,7 @@ class Stream: ): return Get_Configuration_Reject(AVDTP_BAD_STATE_ERROR) - return self.local_endpoint.on_get_configuration_command(configuration) + return self.local_endpoint.on_get_configuration_command() def on_reconfigure_command(self, configuration): if self.state != AVDTP_OPEN_STATE: @@ -1935,20 +1963,20 @@ class Stream: # Wait for the RTP channel to be closed self.change_state(AVDTP_ABORTING_STATE) - def on_l2cap_connection(self, channel): + def on_l2cap_connection(self, channel: l2cap.ClassicChannel) -> None: logger.debug(color('<<< stream channel connected', 'magenta')) self.rtp_channel = channel - channel.on('open', self.on_l2cap_channel_open) - channel.on('close', self.on_l2cap_channel_close) + channel.on(channel.EVENT_OPEN, self.on_l2cap_channel_open) + channel.on(channel.EVENT_CLOSE, self.on_l2cap_channel_close) # We don't need more channels self.protocol.channel_acceptor = None - def on_l2cap_channel_open(self): + def on_l2cap_channel_open(self) -> None: logger.debug(color('<<< stream channel open', 'magenta')) self.local_endpoint.on_rtp_channel_open() - def on_l2cap_channel_close(self): + def on_l2cap_channel_close(self) -> None: logger.debug(color('<<< stream channel closed', 'magenta')) self.local_endpoint.on_rtp_channel_close() self.local_endpoint.in_use = 0 @@ -2065,6 +2093,19 @@ class DiscoveredStreamEndPoint(StreamEndPoint, StreamEndPointProxy): class LocalStreamEndPoint(StreamEndPoint, utils.EventEmitter): stream: Optional[Stream] + EVENT_CONFIGURATION = "configuration" + EVENT_OPEN = "open" + EVENT_START = "start" + EVENT_STOP = "stop" + EVENT_RTP_PACKET = "rtp_packet" + EVENT_SUSPEND = "suspend" + EVENT_CLOSE = "close" + EVENT_ABORT = "abort" + EVENT_DELAY_REPORT = "delay_report" + EVENT_SECURITY_CONTROL = "security_control" + EVENT_RTP_CHANNEL_OPEN = "rtp_channel_open" + EVENT_RTP_CHANNEL_CLOSE = "rtp_channel_close" + def __init__( self, protocol: Protocol, @@ -2080,52 +2121,65 @@ class LocalStreamEndPoint(StreamEndPoint, utils.EventEmitter): self.configuration = configuration if configuration is not None else [] self.stream = None - async def start(self): - pass + async def start(self) -> None: + """[Source Only] Handles when receiving start command.""" - async def stop(self): - pass + async def stop(self) -> None: + """[Source Only] Handles when receiving stop command.""" - async def close(self): - pass + async def close(self) -> None: + """[Source Only] Handles when receiving close command.""" - def on_reconfigure_command(self, command): - pass + def on_reconfigure_command(self, command) -> Optional[Message]: + return None - def on_set_configuration_command(self, configuration): + def on_set_configuration_command(self, configuration) -> Optional[Message]: logger.debug( '<<< received configuration: ' f'{",".join([str(capability) for capability in configuration])}' ) self.configuration = configuration - self.emit('configuration') + self.emit(self.EVENT_CONFIGURATION) + return None - def on_get_configuration_command(self): + def on_get_configuration_command(self) -> Optional[Message]: return Get_Configuration_Response(self.configuration) - def on_open_command(self): - self.emit('open') + def on_open_command(self) -> Optional[Message]: + self.emit(self.EVENT_OPEN) + return None - def on_start_command(self): - self.emit('start') + def on_start_command(self) -> Optional[Message]: + self.emit(self.EVENT_START) + return None - def on_suspend_command(self): - self.emit('suspend') + def on_suspend_command(self) -> Optional[Message]: + self.emit(self.EVENT_SUSPEND) + return None - def on_close_command(self): - self.emit('close') + def on_close_command(self) -> Optional[Message]: + self.emit(self.EVENT_CLOSE) + return None - def on_abort_command(self): - self.emit('abort') + def on_abort_command(self) -> Optional[Message]: + self.emit(self.EVENT_ABORT) + return None - def on_delayreport_command(self, delay: int): - self.emit('delay_report', delay) + def on_delayreport_command(self, delay: int) -> Optional[Message]: + self.emit(self.EVENT_DELAY_REPORT, delay) + return None - def on_rtp_channel_open(self): - self.emit('rtp_channel_open') + def on_security_control_command(self, data: bytes) -> Optional[Message]: + self.emit(self.EVENT_SECURITY_CONTROL, data) + return None - def on_rtp_channel_close(self): - self.emit('rtp_channel_close') + def on_rtp_channel_open(self) -> None: + self.emit(self.EVENT_RTP_CHANNEL_OPEN) + return None + + def on_rtp_channel_close(self) -> None: + self.emit(self.EVENT_RTP_CHANNEL_CLOSE) + return None # ----------------------------------------------------------------------------- @@ -2156,13 +2210,13 @@ class LocalSource(LocalStreamEndPoint): if self.packet_pump and self.stream and self.stream.rtp_channel: return await self.packet_pump.start(self.stream.rtp_channel) - self.emit('start') + self.emit(self.EVENT_START) async def stop(self) -> None: if self.packet_pump: return await self.packet_pump.stop() - self.emit('stop') + self.emit(self.EVENT_STOP) def on_start_command(self): asyncio.create_task(self.start()) @@ -2203,4 +2257,4 @@ class LocalSink(LocalStreamEndPoint): f'{color("<<< RTP Packet:", "green")} ' f'{rtp_packet} {rtp_packet.payload[:16].hex()}' ) - self.emit('rtp_packet', rtp_packet) + self.emit(self.EVENT_RTP_PACKET, rtp_packet) diff --git a/bumble/avrcp.py b/bumble/avrcp.py index b56e604..919e23e 100644 --- a/bumble/avrcp.py +++ b/bumble/avrcp.py @@ -996,6 +996,10 @@ class Delegate: class Protocol(utils.EventEmitter): """AVRCP Controller and Target protocol.""" + EVENT_CONNECTION = "connection" + EVENT_START = "start" + EVENT_STOP = "stop" + class PacketType(enum.IntEnum): SINGLE = 0b00 START = 0b01 @@ -1456,9 +1460,11 @@ class Protocol(utils.EventEmitter): def _on_avctp_connection(self, l2cap_channel: l2cap.ClassicChannel) -> None: logger.debug("AVCTP connection established") - l2cap_channel.on("open", lambda: self._on_avctp_channel_open(l2cap_channel)) + l2cap_channel.on( + l2cap_channel.EVENT_OPEN, lambda: self._on_avctp_channel_open(l2cap_channel) + ) - self.emit("connection") + self.emit(self.EVENT_CONNECTION) def _on_avctp_channel_open(self, l2cap_channel: l2cap.ClassicChannel) -> None: logger.debug("AVCTP channel open") @@ -1473,15 +1479,15 @@ class Protocol(utils.EventEmitter): self.avctp_protocol.register_response_handler( AVRCP_PID, self._on_avctp_response ) - l2cap_channel.on("close", self._on_avctp_channel_close) + l2cap_channel.on(l2cap_channel.EVENT_CLOSE, self._on_avctp_channel_close) - self.emit("start") + self.emit(self.EVENT_START) def _on_avctp_channel_close(self) -> None: logger.debug("AVCTP channel closed") self.avctp_protocol = None - self.emit("stop") + self.emit(self.EVENT_STOP) def _on_avctp_command( self, transaction_label: int, command: avc.CommandFrame diff --git a/bumble/device.py b/bumble/device.py index 97a32f3..7d1d91d 100644 --- a/bumble/device.py +++ b/bumble/device.py @@ -581,6 +581,12 @@ class AdvertisingSet(utils.EventEmitter): enabled: bool = False periodic_enabled: bool = False + EVENT_START = "start" + EVENT_STOP = "stop" + EVENT_START_PERIODIC = "start_periodic" + EVENT_STOP_PERIODIC = "stop_periodic" + EVENT_TERMINATION = "termination" + def __post_init__(self) -> None: super().__init__() @@ -731,7 +737,7 @@ class AdvertisingSet(utils.EventEmitter): ) self.enabled = True - self.emit('start') + self.emit(self.EVENT_START) async def stop(self) -> None: await self.device.send_command( @@ -745,7 +751,7 @@ class AdvertisingSet(utils.EventEmitter): ) self.enabled = False - self.emit('stop') + self.emit(self.EVENT_STOP) async def start_periodic(self, include_adi: bool = False) -> None: if self.periodic_enabled: @@ -759,7 +765,7 @@ class AdvertisingSet(utils.EventEmitter): ) self.periodic_enabled = True - self.emit('start_periodic') + self.emit(self.EVENT_START_PERIODIC) async def stop_periodic(self) -> None: if not self.periodic_enabled: @@ -773,7 +779,7 @@ class AdvertisingSet(utils.EventEmitter): ) self.periodic_enabled = False - self.emit('stop_periodic') + self.emit(self.EVENT_STOP_PERIODIC) async def remove(self) -> None: await self.device.send_command( @@ -797,7 +803,7 @@ class AdvertisingSet(utils.EventEmitter): def on_termination(self, status: int) -> None: self.enabled = False - self.emit('termination', status) + self.emit(self.EVENT_TERMINATION, status) # ----------------------------------------------------------------------------- @@ -823,6 +829,14 @@ class PeriodicAdvertisingSync(utils.EventEmitter): periodic_advertising_interval: int advertiser_clock_accuracy: int + EVENT_STATE_CHANGE = "state_change" + EVENT_ESTABLISHMENT = "establishment" + EVENT_CANCELLATION = "cancellation" + EVENT_ERROR = "error" + EVENT_LOSS = "loss" + EVENT_PERIODIC_ADVERTISEMENT = "periodic_advertisement" + EVENT_BIGINFO_ADVERTISEMENT = "biginfo_advertisement" + def __init__( self, device: Device, @@ -855,7 +869,7 @@ class PeriodicAdvertisingSync(utils.EventEmitter): def state(self, state: State) -> None: logger.debug(f'{self} -> {state.name}') self._state = state - self.emit('state_change') + self.emit(self.EVENT_STATE_CHANGE) async def establish(self) -> None: if self.state != self.State.INIT: @@ -939,7 +953,7 @@ class PeriodicAdvertisingSync(utils.EventEmitter): self.periodic_advertising_interval = periodic_advertising_interval self.advertiser_clock_accuracy = advertiser_clock_accuracy self.state = self.State.ESTABLISHED - self.emit('establishment') + self.emit(self.EVENT_ESTABLISHMENT) return # We don't need to keep a reference anymore @@ -948,15 +962,15 @@ class PeriodicAdvertisingSync(utils.EventEmitter): if status == hci.HCI_OPERATION_CANCELLED_BY_HOST_ERROR: self.state = self.State.CANCELLED - self.emit('cancellation') + self.emit(self.EVENT_CANCELLATION) return self.state = self.State.ERROR - self.emit('error') + self.emit(self.EVENT_ERROR) def on_loss(self): self.state = self.State.LOST - self.emit('loss') + self.emit(self.EVENT_LOSS) def on_periodic_advertising_report(self, report) -> None: self.data_accumulator += report.data @@ -967,7 +981,7 @@ class PeriodicAdvertisingSync(utils.EventEmitter): return self.emit( - 'periodic_advertisement', + self.EVENT_PERIODIC_ADVERTISEMENT, PeriodicAdvertisement( self.advertiser_address, self.sid, @@ -984,7 +998,7 @@ class PeriodicAdvertisingSync(utils.EventEmitter): def on_biginfo_advertising_report(self, report) -> None: self.emit( - 'biginfo_advertisement', + self.EVENT_BIGINFO_ADVERTISEMENT, BIGInfoAdvertisement.from_report(self.advertiser_address, self.sid, report), ) @@ -1222,7 +1236,7 @@ class Peer: async def request_mtu(self, mtu: int) -> int: mtu = await self.gatt_client.request_mtu(mtu) - self.connection.emit('connection_att_mtu_update') + self.connection.emit(self.connection.EVENT_CONNECTION_ATT_MTU_UPDATE) return mtu async def discover_service( @@ -1390,6 +1404,9 @@ class ScoLink(utils.CompositeEventEmitter): link_type: int sink: Optional[Callable[[hci.HCI_SynchronousDataPacket], Any]] = None + EVENT_DISCONNECTION: ClassVar[str] = "disconnection" + EVENT_DISCONNECTION_FAILURE: ClassVar[str] = "disconnection_failure" + def __post_init__(self) -> None: super().__init__() @@ -1487,6 +1504,11 @@ class CisLink(utils.EventEmitter, _IsoLink): state: State = State.PENDING sink: Callable[[hci.HCI_IsoDataPacket], Any] | None = None + EVENT_DISCONNECTION: ClassVar[str] = "disconnection" + EVENT_DISCONNECTION_FAILURE: ClassVar[str] = "disconnection_failure" + EVENT_ESTABLISHMENT: ClassVar[str] = "establishment" + EVENT_ESTABLISHMENT_FAILURE: ClassVar[str] = "establishment_failure" + def __post_init__(self) -> None: super().__init__() @@ -1571,6 +1593,40 @@ class Connection(utils.CompositeEventEmitter): cs_configs: dict[int, ChannelSoundingConfig] # Config ID to Configuration cs_procedures: dict[int, ChannelSoundingProcedure] # Config ID to Procedures + EVENT_CONNECTION_ATT_MTU_UPDATE = "connection_att_mtu_update" + EVENT_DISCONNECTION = "disconnection" + EVENT_DISCONNECTION_FAILURE = "disconnection_failure" + EVENT_CONNECTION_AUTHENTICATION = "connection_authentication" + EVENT_CONNECTION_AUTHENTICATION_FAILURE = "connection_authentication_failure" + EVENT_REMOTE_NAME = "remote_name" + EVENT_REMOTE_NAME_FAILURE = "remote_name_failure" + EVENT_CONNECTION_ENCRYPTION_CHANGE = "connection_encryption_change" + EVENT_CONNECTION_ENCRYPTION_FAILURE = "connection_encryption_failure" + EVENT_CONNECTION_ENCRYPTION_KEY_REFRESH = "connection_encryption_key_refresh" + EVENT_CONNECTION_PARAMETERS_UPDATE = "connection_parameters_update" + EVENT_CONNECTION_PARAMETERS_UPDATE_FAILURE = "connection_parameters_update_failure" + EVENT_CONNECTION_PHY_UPDATE = "connection_phy_update" + EVENT_CONNECTION_PHY_UPDATE_FAILURE = "connection_phy_update_failure" + EVENT_CONNECTION_ATT_MTU_UPDATE = "connection_att_mtu_update" + EVENT_CONNECTION_DATA_LENGTH_CHANGE = "connection_data_length_change" + EVENT_CHANNEL_SOUNDING_CAPABILITIES_FAILURE = ( + "channel_sounding_capabilities_failure" + ) + EVENT_CHANNEL_SOUNDING_CAPABILITIES = "channel_sounding_capabilities" + EVENT_CHANNEL_SOUNDING_CONFIG_FAILURE = "channel_sounding_config_failure" + EVENT_CHANNEL_SOUNDING_CONFIG = "channel_sounding_config" + EVENT_CHANNEL_SOUNDING_CONFIG_REMOVED = "channel_sounding_config_removed" + EVENT_CHANNEL_SOUNDING_PROCEDURE_FAILURE = "channel_sounding_procedure_failure" + EVENT_CHANNEL_SOUNDING_PROCEDURE = "channel_sounding_procedure" + EVENT_ROLE_CHANGE = "role_change" + EVENT_ROLE_CHANGE_FAILURE = "role_change_failure" + EVENT_CLASSIC_PAIRING = "classic_pairing" + EVENT_CLASSIC_PAIRING_FAILURE = "classic_pairing_failure" + EVENT_PAIRING_START = "pairing_start" + EVENT_PAIRING = "pairing" + EVENT_PAIRING_FAILURE = "pairing_failure" + EVENT_SECURITY_REQUEST = "security_request" + @utils.composite_listener class Listener: def on_disconnection(self, reason): @@ -1740,16 +1796,16 @@ class Connection(utils.CompositeEventEmitter): """Idles the current task waiting for a disconnect or timeout""" abort = asyncio.get_running_loop().create_future() - self.on('disconnection', abort.set_result) - self.on('disconnection_failure', abort.set_exception) + self.on(self.EVENT_DISCONNECTION, abort.set_result) + self.on(self.EVENT_DISCONNECTION_FAILURE, abort.set_exception) try: await asyncio.wait_for( utils.cancel_on_event(self.device, 'flush', abort), timeout ) finally: - self.remove_listener('disconnection', abort.set_result) - self.remove_listener('disconnection_failure', abort.set_exception) + self.remove_listener(self.EVENT_DISCONNECTION, abort.set_result) + self.remove_listener(self.EVENT_DISCONNECTION_FAILURE, abort.set_exception) async def set_data_length(self, tx_octets, tx_time) -> None: return await self.device.set_data_length(self, tx_octets, tx_time) @@ -2062,6 +2118,26 @@ class Device(utils.CompositeEventEmitter): _pending_cis: Dict[int, tuple[int, int]] gatt_service: gatt_service.GenericAttributeProfileService | None = None + EVENT_ADVERTISEMENT = "advertisement" + EVENT_PERIODIC_ADVERTISING_SYNC_TRANSFER = "periodic_advertising_sync_transfer" + EVENT_KEY_STORE_UPDATE = "key_store_update" + EVENT_FLUSH = "flush" + EVENT_CONNECTION = "connection" + EVENT_CONNECTION_FAILURE = "connection_failure" + EVENT_SCO_REQUEST = "sco_request" + EVENT_INQUIRY_COMPLETE = "inquiry_complete" + EVENT_REMOTE_NAME = "remote_name" + EVENT_REMOTE_NAME_FAILURE = "remote_name_failure" + EVENT_SCO_CONNECTION = "sco_connection" + EVENT_SCO_CONNECTION_FAILURE = "sco_connection_failure" + EVENT_CIS_REQUEST = "cis_request" + EVENT_CIS_ESTABLISHMENT = "cis_establishment" + EVENT_CIS_ESTABLISHMENT_FAILURE = "cis_establishment_failure" + EVENT_ROLE_CHANGE_FAILURE = "role_change_failure" + EVENT_INQUIRY_RESULT = "inquiry_result" + EVENT_REMOTE_NAME = "remote_name" + EVENT_REMOTE_NAME_FAILURE = "remote_name_failure" + @utils.composite_listener class Listener: def on_advertisement(self, advertisement): @@ -3149,7 +3225,7 @@ class Device(utils.CompositeEventEmitter): accumulator = AdvertisementDataAccumulator(passive=self.scanning_is_passive) self.advertisement_accumulators[report.address] = accumulator if advertisement := accumulator.update(report): - self.emit('advertisement', advertisement) + self.emit(self.EVENT_ADVERTISEMENT, advertisement) async def create_periodic_advertising_sync( self, @@ -3273,7 +3349,7 @@ class Device(utils.CompositeEventEmitter): periodic_advertising_interval=periodic_advertising_interval, advertiser_clock_accuracy=advertiser_clock_accuracy, ) - self.emit('periodic_advertising_sync_transfer', pa_sync, connection) + self.emit(self.EVENT_PERIODIC_ADVERTISING_SYNC_TRANSFER, pa_sync, connection) @host_event_handler @with_periodic_advertising_sync_from_handle @@ -3331,7 +3407,7 @@ class Device(utils.CompositeEventEmitter): @host_event_handler def on_inquiry_result(self, address, class_of_device, data, rssi): self.emit( - 'inquiry_result', + self.EVENT_INQUIRY_RESULT, address, class_of_device, AdvertisingData.from_bytes(data), @@ -3508,8 +3584,8 @@ class Device(utils.CompositeEventEmitter): # Create a future so that we can wait for the connection's result pending_connection = asyncio.get_running_loop().create_future() - self.on('connection', on_connection) - self.on('connection_failure', on_connection_failure) + self.on(self.EVENT_CONNECTION, on_connection) + self.on(self.EVENT_CONNECTION_FAILURE, on_connection_failure) try: # Tell the controller to connect @@ -3685,8 +3761,8 @@ class Device(utils.CompositeEventEmitter): except core.ConnectionError as error: raise core.TimeoutError() from error finally: - self.remove_listener('connection', on_connection) - self.remove_listener('connection_failure', on_connection_failure) + self.remove_listener(self.EVENT_CONNECTION, on_connection) + self.remove_listener(self.EVENT_CONNECTION_FAILURE, on_connection_failure) if transport == PhysicalTransport.LE: self.le_connecting = False self.connect_own_address_type = None @@ -3779,8 +3855,8 @@ class Device(utils.CompositeEventEmitter): ): pending_connection.set_exception(error) - self.on('connection', on_connection) - self.on('connection_failure', on_connection_failure) + self.on(self.EVENT_CONNECTION, on_connection) + self.on(self.EVENT_CONNECTION_FAILURE, on_connection_failure) # Save pending connection, with the Peripheral hci.role. # Even if we requested a role switch in the hci.HCI_Accept_Connection_Request @@ -3802,8 +3878,8 @@ class Device(utils.CompositeEventEmitter): return await utils.cancel_on_event(self, 'flush', pending_connection) finally: - self.remove_listener('connection', on_connection) - self.remove_listener('connection_failure', on_connection_failure) + self.remove_listener(self.EVENT_CONNECTION, on_connection) + self.remove_listener(self.EVENT_CONNECTION_FAILURE, on_connection_failure) self.pending_connections.pop(peer_address, None) @asynccontextmanager @@ -3857,8 +3933,10 @@ class Device(utils.CompositeEventEmitter): ) -> None: # Create a future so that we can wait for the disconnection's result pending_disconnection = asyncio.get_running_loop().create_future() - connection.on('disconnection', pending_disconnection.set_result) - connection.on('disconnection_failure', pending_disconnection.set_exception) + connection.on(connection.EVENT_DISCONNECTION, pending_disconnection.set_result) + connection.on( + connection.EVENT_DISCONNECTION_FAILURE, pending_disconnection.set_exception + ) # Request a disconnection result = await self.send_command( @@ -3876,10 +3954,11 @@ class Device(utils.CompositeEventEmitter): return await utils.cancel_on_event(self, 'flush', pending_disconnection) finally: connection.remove_listener( - 'disconnection', pending_disconnection.set_result + connection.EVENT_DISCONNECTION, pending_disconnection.set_result ) connection.remove_listener( - 'disconnection_failure', pending_disconnection.set_exception + connection.EVENT_DISCONNECTION_FAILURE, + pending_disconnection.set_exception, ) self.disconnecting = False @@ -4198,7 +4277,7 @@ class Device(utils.CompositeEventEmitter): return keys.link_key.value # [Classic only] - async def authenticate(self, connection): + async def authenticate(self, connection: Connection) -> None: # Set up event handlers pending_authentication = asyncio.get_running_loop().create_future() @@ -4208,8 +4287,11 @@ class Device(utils.CompositeEventEmitter): def on_authentication_failure(error_code): pending_authentication.set_exception(hci.HCI_Error(error_code)) - connection.on('connection_authentication', on_authentication) - connection.on('connection_authentication_failure', on_authentication_failure) + connection.on(connection.EVENT_CONNECTION_AUTHENTICATION, on_authentication) + connection.on( + connection.EVENT_CONNECTION_AUTHENTICATION_FAILURE, + on_authentication_failure, + ) # Request the authentication try: @@ -4230,9 +4312,12 @@ class Device(utils.CompositeEventEmitter): connection, 'disconnection', pending_authentication ) finally: - connection.remove_listener('connection_authentication', on_authentication) connection.remove_listener( - 'connection_authentication_failure', on_authentication_failure + connection.EVENT_CONNECTION_AUTHENTICATION, on_authentication + ) + connection.remove_listener( + connection.EVENT_CONNECTION_AUTHENTICATION_FAILURE, + on_authentication_failure, ) async def encrypt(self, connection, enable=True): @@ -4248,8 +4333,12 @@ class Device(utils.CompositeEventEmitter): def on_encryption_failure(error_code): pending_encryption.set_exception(hci.HCI_Error(error_code)) - connection.on('connection_encryption_change', on_encryption_change) - connection.on('connection_encryption_failure', on_encryption_failure) + connection.on( + connection.EVENT_CONNECTION_ENCRYPTION_CHANGE, on_encryption_change + ) + connection.on( + connection.EVENT_CONNECTION_ENCRYPTION_FAILURE, on_encryption_failure + ) # Request the encryption try: @@ -4311,10 +4400,10 @@ class Device(utils.CompositeEventEmitter): await utils.cancel_on_event(connection, 'disconnection', pending_encryption) finally: connection.remove_listener( - 'connection_encryption_change', on_encryption_change + connection.EVENT_CONNECTION_ENCRYPTION_CHANGE, on_encryption_change ) connection.remove_listener( - 'connection_encryption_failure', on_encryption_failure + connection.EVENT_CONNECTION_ENCRYPTION_FAILURE, on_encryption_failure ) async def update_keys(self, address: str, keys: PairingKeys) -> None: @@ -4327,7 +4416,7 @@ class Device(utils.CompositeEventEmitter): except Exception as error: logger.warning(f'!!! error while storing keys: {error}') else: - self.emit('key_store_update') + self.emit(self.EVENT_KEY_STORE_UPDATE) # [Classic only] async def switch_role(self, connection: Connection, role: hci.Role): @@ -4339,8 +4428,8 @@ class Device(utils.CompositeEventEmitter): def on_role_change_failure(error_code): pending_role_change.set_exception(hci.HCI_Error(error_code)) - connection.on('role_change', on_role_change) - connection.on('role_change_failure', on_role_change_failure) + connection.on(connection.EVENT_ROLE_CHANGE, on_role_change) + connection.on(connection.EVENT_ROLE_CHANGE_FAILURE, on_role_change_failure) try: result = await self.send_command( @@ -4356,8 +4445,10 @@ class Device(utils.CompositeEventEmitter): connection, 'disconnection', pending_role_change ) finally: - connection.remove_listener('role_change', on_role_change) - connection.remove_listener('role_change_failure', on_role_change_failure) + connection.remove_listener(connection.EVENT_ROLE_CHANGE, on_role_change) + connection.remove_listener( + connection.EVENT_ROLE_CHANGE_FAILURE, on_role_change_failure + ) # [Classic only] async def request_remote_name(self, remote: Union[hci.Address, Connection]) -> str: @@ -4369,7 +4460,7 @@ class Device(utils.CompositeEventEmitter): ) handler = self.on( - 'remote_name', + self.EVENT_REMOTE_NAME, lambda address, remote_name: ( pending_name.set_result(remote_name) if address == peer_address @@ -4377,7 +4468,7 @@ class Device(utils.CompositeEventEmitter): ), ) failure_handler = self.on( - 'remote_name_failure', + self.EVENT_REMOTE_NAME_FAILURE, lambda address, error_code: ( pending_name.set_exception(hci.HCI_Error(error_code)) if address == peer_address @@ -4405,8 +4496,8 @@ class Device(utils.CompositeEventEmitter): # Wait for the result return await utils.cancel_on_event(self, 'flush', pending_name) finally: - self.remove_listener('remote_name', handler) - self.remove_listener('remote_name_failure', failure_handler) + self.remove_listener(self.EVENT_REMOTE_NAME, handler) + self.remove_listener(self.EVENT_REMOTE_NAME_FAILURE, failure_handler) # [LE only] @utils.experimental('Only for testing.') @@ -4497,8 +4588,10 @@ class Device(utils.CompositeEventEmitter): if pending_future := pending_cis_establishments.get(cis_handle): pending_future.set_exception(hci.HCI_Error(status)) - watcher.on(self, 'cis_establishment', on_cis_establishment) - watcher.on(self, 'cis_establishment_failure', on_cis_establishment_failure) + watcher.on(self, self.EVENT_CIS_ESTABLISHMENT, on_cis_establishment) + watcher.on( + self, self.EVENT_CIS_ESTABLISHMENT_FAILURE, on_cis_establishment_failure + ) await self.send_command( hci.HCI_LE_Create_CIS_Command( cis_connection_handle=[p[0] for p in cis_acl_pairs], @@ -4541,8 +4634,12 @@ class Device(utils.CompositeEventEmitter): def on_establishment_failure(status: int) -> None: pending_establishment.set_exception(hci.HCI_Error(status)) - watcher.on(cis_link, 'establishment', on_establishment) - watcher.on(cis_link, 'establishment_failure', on_establishment_failure) + watcher.on(cis_link, cis_link.EVENT_ESTABLISHMENT, on_establishment) + watcher.on( + cis_link, + cis_link.EVENT_ESTABLISHMENT_FAILURE, + on_establishment_failure, + ) await self.send_command( hci.HCI_LE_Accept_CIS_Request_Command(connection_handle=handle), @@ -4910,9 +5007,9 @@ class Device(utils.CompositeEventEmitter): @host_event_handler def on_flush(self): - self.emit('flush') + self.emit(self.EVENT_FLUSH) for _, connection in self.connections.items(): - connection.emit('disconnection', 0) + connection.emit(connection.EVENT_DISCONNECTION, 0) self.connections = {} # [Classic only] @@ -5203,7 +5300,7 @@ class Device(utils.CompositeEventEmitter): lambda _: utils.cancel_on_event(self, 'flush', advertising_set.start()), ) - self.emit('connection', connection) + self.emit(self.EVENT_CONNECTION, connection) @host_event_handler def on_connection( @@ -5241,7 +5338,7 @@ class Device(utils.CompositeEventEmitter): self.connections[connection_handle] = connection # Emit an event to notify listeners of the new connection - self.emit('connection', connection) + self.emit(self.EVENT_CONNECTION, connection) return @@ -5316,7 +5413,7 @@ class Device(utils.CompositeEventEmitter): if role == hci.Role.CENTRAL or not self.supports_le_extended_advertising: # We can emit now, we have all the info we need - self.emit('connection', connection) + self.emit(self.EVENT_CONNECTION, connection) return if role == hci.Role.PERIPHERAL and self.supports_le_extended_advertising: @@ -5350,7 +5447,7 @@ class Device(utils.CompositeEventEmitter): 'hci', hci.HCI_Constant.error_name(error_code), ) - self.emit('connection_failure', error) + self.emit(self.EVENT_CONNECTION_FAILURE, error) # FIXME: Explore a delegate-model for BR/EDR wait connection #56. @host_event_handler @@ -5365,7 +5462,7 @@ class Device(utils.CompositeEventEmitter): if connection := self.find_connection_by_bd_addr( bd_addr, transport=PhysicalTransport.BR_EDR ): - self.emit('sco_request', connection, link_type) + self.emit(self.EVENT_SCO_REQUEST, connection, link_type) else: logger.error(f'SCO request from a non-connected device {bd_addr}') return @@ -5409,14 +5506,14 @@ class Device(utils.CompositeEventEmitter): f'*** Disconnection: [0x{connection.handle:04X}] ' f'{connection.peer_address} as {connection.role_name}, reason={reason}' ) - connection.emit('disconnection', reason) + connection.emit(connection.EVENT_DISCONNECTION, reason) # Cleanup subsystems that maintain per-connection state self.gatt_server.on_disconnection(connection) elif sco_link := self.sco_links.pop(connection_handle, None): - sco_link.emit('disconnection', reason) + sco_link.emit(sco_link.EVENT_DISCONNECTION, reason) elif cis_link := self.cis_links.pop(connection_handle, None): - cis_link.emit('disconnection', reason) + cis_link.emit(cis_link.EVENT_DISCONNECTION, reason) else: logger.error( f'*** Unknown disconnection handle=0x{connection_handle}, reason={reason} ***' @@ -5424,7 +5521,7 @@ class Device(utils.CompositeEventEmitter): @host_event_handler @with_connection_from_handle - def on_disconnection_failure(self, connection, error_code): + def on_disconnection_failure(self, connection: Connection, error_code: int): logger.debug(f'*** Disconnection failed: {error_code}') error = core.ConnectionError( error_code, @@ -5433,7 +5530,7 @@ class Device(utils.CompositeEventEmitter): 'hci', hci.HCI_Constant.error_name(error_code), ) - connection.emit('disconnection_failure', error) + connection.emit(connection.EVENT_DISCONNECTION_FAILURE, error) @host_event_handler @utils.AsyncRunner.run_in_task() @@ -5444,7 +5541,7 @@ class Device(utils.CompositeEventEmitter): else: self.auto_restart_inquiry = True self.discovering = False - self.emit('inquiry_complete') + self.emit(self.EVENT_INQUIRY_COMPLETE) @host_event_handler @with_connection_from_handle @@ -5454,7 +5551,7 @@ class Device(utils.CompositeEventEmitter): f'{connection.peer_address} as {connection.role_name}' ) connection.authenticated = True - connection.emit('connection_authentication') + connection.emit(connection.EVENT_CONNECTION_AUTHENTICATION) @host_event_handler @with_connection_from_handle @@ -5463,7 +5560,7 @@ class Device(utils.CompositeEventEmitter): f'*** Connection Authentication Failure: [0x{connection.handle:04X}] ' f'{connection.peer_address} as {connection.role_name}, error={error}' ) - connection.emit('connection_authentication_failure', error) + connection.emit(connection.EVENT_CONNECTION_AUTHENTICATION_FAILURE, error) # [Classic only] @host_event_handler @@ -5681,22 +5778,22 @@ class Device(utils.CompositeEventEmitter): remote_name = remote_name.decode('utf-8') if connection: connection.peer_name = remote_name - connection.emit('remote_name') - self.emit('remote_name', address, remote_name) + connection.emit(connection.EVENT_REMOTE_NAME) + self.emit(self.EVENT_REMOTE_NAME, address, remote_name) except UnicodeDecodeError as error: logger.warning('peer name is not valid UTF-8') if connection: - connection.emit('remote_name_failure', error) + connection.emit(connection.EVENT_REMOTE_NAME_FAILURE, error) else: - self.emit('remote_name_failure', address, error) + self.emit(self.EVENT_REMOTE_NAME_FAILURE, address, error) # [Classic only] @host_event_handler @try_with_connection_from_address def on_remote_name_failure(self, connection: Connection, address, error): if connection: - connection.emit('remote_name_failure', error) - self.emit('remote_name_failure', address, error) + connection.emit(connection.EVENT_REMOTE_NAME_FAILURE, error) + self.emit(self.EVENT_REMOTE_NAME_FAILURE, address, error) # [Classic only] @host_event_handler @@ -5716,7 +5813,7 @@ class Device(utils.CompositeEventEmitter): handle=sco_handle, link_type=link_type, ) - self.emit('sco_connection', sco_link) + self.emit(self.EVENT_SCO_CONNECTION, sco_link) # [Classic only] @host_event_handler @@ -5726,7 +5823,7 @@ class Device(utils.CompositeEventEmitter): self, acl_connection: Connection, status: int ) -> None: logger.debug(f'*** SCO connection failure: {acl_connection.peer_address}***') - self.emit('sco_connection_failure') + self.emit(self.EVENT_SCO_CONNECTION_FAILURE) # [Classic only] @host_event_handler @@ -5763,7 +5860,7 @@ class Device(utils.CompositeEventEmitter): cig_id=cig_id, cis_id=cis_id, ) - self.emit('cis_request', acl_connection, cis_handle, cig_id, cis_id) + self.emit(self.EVENT_CIS_REQUEST, acl_connection, cis_handle, cig_id, cis_id) # [LE only] @host_event_handler @@ -5782,8 +5879,8 @@ class Device(utils.CompositeEventEmitter): f'cis_id=[0x{cis_link.cis_id:02X}] ***' ) - cis_link.emit('establishment') - self.emit('cis_establishment', cis_link) + cis_link.emit(cis_link.EVENT_ESTABLISHMENT) + self.emit(self.EVENT_CIS_ESTABLISHMENT, cis_link) # [LE only] @host_event_handler @@ -5791,8 +5888,8 @@ class Device(utils.CompositeEventEmitter): def on_cis_establishment_failure(self, cis_handle: int, status: int) -> None: logger.debug(f'*** CIS Establishment Failure: cis=[0x{cis_handle:04X}] ***') if cis_link := self.cis_links.pop(cis_handle): - cis_link.emit('establishment_failure', status) - self.emit('cis_establishment_failure', cis_handle, status) + cis_link.emit(cis_link.EVENT_ESTABLISHMENT_FAILURE, status) + self.emit(self.EVENT_CIS_ESTABLISHMENT_FAILURE, cis_handle, status) # [LE only] @host_event_handler @@ -5826,7 +5923,7 @@ class Device(utils.CompositeEventEmitter): ): connection.authenticated = True connection.sc = True - connection.emit('connection_encryption_change') + connection.emit(connection.EVENT_CONNECTION_ENCRYPTION_CHANGE) @host_event_handler @with_connection_from_handle @@ -5836,7 +5933,7 @@ class Device(utils.CompositeEventEmitter): f'{connection.peer_address} as {connection.role_name}, ' f'error={error}' ) - connection.emit('connection_encryption_failure', error) + connection.emit(connection.EVENT_CONNECTION_ENCRYPTION_FAILURE, error) @host_event_handler @with_connection_from_handle @@ -5845,7 +5942,7 @@ class Device(utils.CompositeEventEmitter): f'*** Connection Key Refresh: [0x{connection.handle:04X}] ' f'{connection.peer_address} as {connection.role_name}' ) - connection.emit('connection_encryption_key_refresh') + connection.emit(connection.EVENT_CONNECTION_ENCRYPTION_KEY_REFRESH) @host_event_handler @with_connection_from_handle @@ -5856,7 +5953,7 @@ class Device(utils.CompositeEventEmitter): f'{connection_parameters}' ) connection.parameters = connection_parameters - connection.emit('connection_parameters_update') + connection.emit(connection.EVENT_CONNECTION_PARAMETERS_UPDATE) @host_event_handler @with_connection_from_handle @@ -5866,7 +5963,7 @@ class Device(utils.CompositeEventEmitter): f'{connection.peer_address} as {connection.role_name}, ' f'error={error}' ) - connection.emit('connection_parameters_update_failure', error) + connection.emit(connection.EVENT_CONNECTION_PARAMETERS_UPDATE_FAILURE, error) @host_event_handler @with_connection_from_handle @@ -5876,7 +5973,7 @@ class Device(utils.CompositeEventEmitter): f'{connection.peer_address} as {connection.role_name}, ' f'{phy}' ) - connection.emit('connection_phy_update', phy) + connection.emit(connection.EVENT_CONNECTION_PHY_UPDATE, phy) @host_event_handler @with_connection_from_handle @@ -5886,7 +5983,7 @@ class Device(utils.CompositeEventEmitter): f'{connection.peer_address} as {connection.role_name}, ' f'error={error}' ) - connection.emit('connection_phy_update_failure', error) + connection.emit(connection.EVENT_CONNECTION_PHY_UPDATE_FAILURE, error) @host_event_handler @with_connection_from_handle @@ -5897,7 +5994,7 @@ class Device(utils.CompositeEventEmitter): f'{att_mtu}' ) connection.att_mtu = att_mtu - connection.emit('connection_att_mtu_update') + connection.emit(connection.EVENT_CONNECTION_ATT_MTU_UPDATE) @host_event_handler @with_connection_from_handle @@ -5914,7 +6011,7 @@ class Device(utils.CompositeEventEmitter): max_rx_octets, max_rx_time, ) - connection.emit('connection_data_length_change') + connection.emit(connection.EVENT_CONNECTION_DATA_LENGTH_CHANGE) @host_event_handler def on_cs_remote_supported_capabilities( @@ -5924,7 +6021,9 @@ class Device(utils.CompositeEventEmitter): return if event.status != hci.HCI_SUCCESS: - connection.emit('channel_sounding_capabilities_failure', event.status) + connection.emit( + connection.EVENT_CHANNEL_SOUNDING_CAPABILITIES_FAILURE, event.status + ) return capabilities = ChannelSoundingCapabilities( @@ -5949,7 +6048,7 @@ class Device(utils.CompositeEventEmitter): t_sw_time_supported=event.t_sw_time_supported, tx_snr_capability=event.tx_snr_capability, ) - connection.emit('channel_sounding_capabilities', capabilities) + connection.emit(connection.EVENT_CHANNEL_SOUNDING_CAPABILITIES, capabilities) @host_event_handler def on_cs_config(self, event: hci.HCI_LE_CS_Config_Complete_Event): @@ -5957,7 +6056,9 @@ class Device(utils.CompositeEventEmitter): return if event.status != hci.HCI_SUCCESS: - connection.emit('channel_sounding_config_failure', event.status) + connection.emit( + connection.EVENT_CHANNEL_SOUNDING_CONFIG_FAILURE, event.status + ) return if event.action == hci.HCI_LE_CS_Config_Complete_Event.Action.CREATED: config = ChannelSoundingConfig( @@ -5983,11 +6084,13 @@ class Device(utils.CompositeEventEmitter): t_pm_time=event.t_pm_time, ) connection.cs_configs[event.config_id] = config - connection.emit('channel_sounding_config', config) + connection.emit(connection.EVENT_CHANNEL_SOUNDING_CONFIG, config) elif event.action == hci.HCI_LE_CS_Config_Complete_Event.Action.REMOVED: try: config = connection.cs_configs.pop(event.config_id) - connection.emit('channel_sounding_config_removed', config.config_id) + connection.emit( + connection.EVENT_CHANNEL_SOUNDING_CONFIG_REMOVED, config.config_id + ) except KeyError: logger.error('Removing unknown config %d', event.config_id) @@ -5997,7 +6100,9 @@ class Device(utils.CompositeEventEmitter): return if event.status != hci.HCI_SUCCESS: - connection.emit('channel_sounding_procedure_failure', event.status) + connection.emit( + connection.EVENT_CHANNEL_SOUNDING_PROCEDURE_FAILURE, event.status + ) return procedure = ChannelSoundingProcedure( @@ -6014,37 +6119,37 @@ class Device(utils.CompositeEventEmitter): max_procedure_len=event.max_procedure_len, ) connection.cs_procedures[procedure.config_id] = procedure - connection.emit('channel_sounding_procedure', procedure) + connection.emit(connection.EVENT_CHANNEL_SOUNDING_PROCEDURE, procedure) # [Classic only] @host_event_handler @with_connection_from_address def on_role_change(self, connection, new_role): connection.role = new_role - connection.emit('role_change', new_role) + connection.emit(connection.EVENT_ROLE_CHANGE, new_role) # [Classic only] @host_event_handler @try_with_connection_from_address def on_role_change_failure(self, connection, address, error): if connection: - connection.emit('role_change_failure', error) - self.emit('role_change_failure', address, error) + connection.emit(connection.EVENT_ROLE_CHANGE_FAILURE, error) + self.emit(self.EVENT_ROLE_CHANGE_FAILURE, address, error) # [Classic only] @host_event_handler @with_connection_from_address def on_classic_pairing(self, connection: Connection) -> None: - connection.emit('classic_pairing') + connection.emit(connection.EVENT_CLASSIC_PAIRING) # [Classic only] @host_event_handler @with_connection_from_address def on_classic_pairing_failure(self, connection: Connection, status) -> None: - connection.emit('classic_pairing_failure', status) + connection.emit(connection.EVENT_CLASSIC_PAIRING_FAILURE, status) def on_pairing_start(self, connection: Connection) -> None: - connection.emit('pairing_start') + connection.emit(connection.EVENT_PAIRING_START) def on_pairing( self, @@ -6058,10 +6163,10 @@ class Device(utils.CompositeEventEmitter): connection.peer_address = identity_address connection.sc = sc connection.authenticated = True - connection.emit('pairing', keys) + connection.emit(connection.EVENT_PAIRING, keys) def on_pairing_failure(self, connection: Connection, reason: int) -> None: - connection.emit('pairing_failure', reason) + connection.emit(connection.EVENT_PAIRING_FAILURE, reason) @with_connection_from_handle def on_gatt_pdu(self, connection, pdu): diff --git a/bumble/gatt.py b/bumble/gatt.py index 3dacbf7..be75f45 100644 --- a/bumble/gatt.py +++ b/bumble/gatt.py @@ -448,6 +448,8 @@ class Characteristic(Attribute[_T]): uuid: UUID properties: Characteristic.Properties + EVENT_SUBSCRIPTION = "subscription" + class Properties(enum.IntFlag): """Property flags""" diff --git a/bumble/gatt_client.py b/bumble/gatt_client.py index fbe4e77..c2f23e3 100644 --- a/bumble/gatt_client.py +++ b/bumble/gatt_client.py @@ -202,6 +202,8 @@ class CharacteristicProxy(AttributeProxy[_T]): descriptors: List[DescriptorProxy] subscribers: Dict[Any, Callable[[_T], Any]] + EVENT_UPDATE = "update" + def __init__( self, client: Client, @@ -308,7 +310,7 @@ class Client: self.services = [] self.cached_values = {} - connection.on('disconnection', self.on_disconnection) + connection.on(connection.EVENT_DISCONNECTION, self.on_disconnection) def send_gatt_pdu(self, pdu: bytes) -> None: self.connection.send_l2cap_pdu(ATT_CID, pdu) @@ -1142,7 +1144,7 @@ class Client: if callable(subscriber): subscriber(notification.attribute_value) else: - subscriber.emit('update', notification.attribute_value) + subscriber.emit(subscriber.EVENT_UPDATE, notification.attribute_value) def on_att_handle_value_indication(self, indication): # Call all subscribers @@ -1157,7 +1159,7 @@ class Client: if callable(subscriber): subscriber(indication.attribute_value) else: - subscriber.emit('update', indication.attribute_value) + subscriber.emit(subscriber.EVENT_UPDATE, indication.attribute_value) # Confirm that we received the indication self.send_confirmation(ATT_Handle_Value_Confirmation()) diff --git a/bumble/gatt_server.py b/bumble/gatt_server.py index 9cb2ba6..ad281da 100644 --- a/bumble/gatt_server.py +++ b/bumble/gatt_server.py @@ -110,6 +110,8 @@ class Server(utils.EventEmitter): indication_semaphores: defaultdict[int, asyncio.Semaphore] pending_confirmations: defaultdict[int, Optional[asyncio.futures.Future]] + EVENT_CHARACTERISTIC_SUBSCRIPTION = "characteristic_subscription" + def __init__(self, device: Device) -> None: super().__init__() self.device = device @@ -347,10 +349,13 @@ class Server(utils.EventEmitter): notify_enabled = value[0] & 0x01 != 0 indicate_enabled = value[0] & 0x02 != 0 characteristic.emit( - 'subscription', connection, notify_enabled, indicate_enabled + characteristic.EVENT_SUBSCRIPTION, + connection, + notify_enabled, + indicate_enabled, ) self.emit( - 'characteristic_subscription', + self.EVENT_CHARACTERISTIC_SUBSCRIPTION, connection, characteristic, notify_enabled, diff --git a/bumble/hfp.py b/bumble/hfp.py index 454279e..76d622f 100644 --- a/bumble/hfp.py +++ b/bumble/hfp.py @@ -720,6 +720,14 @@ class HfProtocol(utils.EventEmitter): vrec: VoiceRecognitionState """ + EVENT_CODEC_NEGOTIATION = "codec_negotiation" + EVENT_AG_INDICATOR = "ag_indicator" + EVENT_SPEAKER_VOLUME = "speaker_volume" + EVENT_MICROPHONE_VOLUME = "microphone_volume" + EVENT_RING = "ring" + EVENT_CLI_NOTIFICATION = "cli_notification" + EVENT_VOICE_RECOGNITION = "voice_recognition" + class HfLoopTermination(HfpProtocolError): """Termination signal for run() loop.""" @@ -777,7 +785,8 @@ class HfProtocol(utils.EventEmitter): self.dlc.sink = self._read_at # Stop the run() loop when L2CAP is closed. self.dlc.multiplexer.l2cap_channel.on( - 'close', lambda: self.unsolicited_queue.put_nowait(None) + self.dlc.multiplexer.l2cap_channel.EVENT_CLOSE, + lambda: self.unsolicited_queue.put_nowait(None), ) def supports_hf_feature(self, feature: HfFeature) -> bool: @@ -1034,7 +1043,7 @@ class HfProtocol(utils.EventEmitter): # ID. The HF shall be ready to accept the synchronous connection # establishment as soon as it has sent the AT commands AT+BCS=. self.active_codec = AudioCodec(codec_id) - self.emit('codec_negotiation', self.active_codec) + self.emit(self.EVENT_CODEC_NEGOTIATION, self.active_codec) logger.info("codec connection setup completed") @@ -1095,7 +1104,7 @@ class HfProtocol(utils.EventEmitter): # CIEV is in 1-index, while ag_indicators is in 0-index. ag_indicator = self.ag_indicators[index - 1] ag_indicator.current_status = value - self.emit('ag_indicator', ag_indicator) + self.emit(self.EVENT_AG_INDICATOR, ag_indicator) logger.info(f"AG indicator updated: {ag_indicator.indicator}, {value}") async def handle_unsolicited(self): @@ -1110,19 +1119,21 @@ class HfProtocol(utils.EventEmitter): int(result.parameters[0]), int(result.parameters[1]) ) elif result.code == "+VGS": - self.emit('speaker_volume', int(result.parameters[0])) + self.emit(self.EVENT_SPEAKER_VOLUME, int(result.parameters[0])) elif result.code == "+VGM": - self.emit('microphone_volume', int(result.parameters[0])) + self.emit(self.EVENT_MICROPHONE_VOLUME, int(result.parameters[0])) elif result.code == "RING": - self.emit('ring') + self.emit(self.EVENT_RING) elif result.code == "+CLIP": self.emit( - 'cli_notification', CallLineIdentification.parse_from(result.parameters) + self.EVENT_CLI_NOTIFICATION, + CallLineIdentification.parse_from(result.parameters), ) elif result.code == "+BVRA": # TODO: Support Enhanced Voice Recognition. self.emit( - 'voice_recognition', VoiceRecognitionState(int(result.parameters[0])) + self.EVENT_VOICE_RECOGNITION, + VoiceRecognitionState(int(result.parameters[0])), ) else: logging.info(f"unhandled unsolicited response {result.code}") @@ -1179,6 +1190,19 @@ class AgProtocol(utils.EventEmitter): volume: Int """ + EVENT_SLC_COMPLETE = "slc_complete" + EVENT_SUPPORTED_AUDIO_CODECS = "supported_audio_codecs" + EVENT_CODEC_NEGOTIATION = "codec_negotiation" + EVENT_VOICE_RECOGNITION = "voice_recognition" + EVENT_CALL_HOLD = "call_hold" + EVENT_HF_INDICATOR = "hf_indicator" + EVENT_CODEC_CONNECTION_REQUEST = "codec_connection_request" + EVENT_ANSWER = "answer" + EVENT_DIAL = "dial" + EVENT_HANG_UP = "hang_up" + EVENT_SPEAKER_VOLUME = "speaker_volume" + EVENT_MICROPHONE_VOLUME = "microphone_volume" + supported_hf_features: int supported_hf_indicators: Set[HfIndicator] supported_audio_codecs: List[AudioCodec] @@ -1371,7 +1395,7 @@ class AgProtocol(utils.EventEmitter): def _check_remained_slc_commands(self) -> None: if not self._remained_slc_setup_features: - self.emit('slc_complete') + self.emit(self.EVENT_SLC_COMPLETE) def _on_brsf(self, hf_features: bytes) -> None: self.supported_hf_features = int(hf_features) @@ -1390,17 +1414,17 @@ class AgProtocol(utils.EventEmitter): def _on_bac(self, *args) -> None: self.supported_audio_codecs = [AudioCodec(int(value)) for value in args] - self.emit('supported_audio_codecs', self.supported_audio_codecs) + self.emit(self.EVENT_SUPPORTED_AUDIO_CODECS, self.supported_audio_codecs) self.send_ok() def _on_bcs(self, codec: bytes) -> None: self.active_codec = AudioCodec(int(codec)) self.send_ok() - self.emit('codec_negotiation', self.active_codec) + self.emit(self.EVENT_CODEC_NEGOTIATION, self.active_codec) def _on_bvra(self, vrec: bytes) -> None: self.send_ok() - self.emit('voice_recognition', VoiceRecognitionState(int(vrec))) + self.emit(self.EVENT_VOICE_RECOGNITION, VoiceRecognitionState(int(vrec))) def _on_chld(self, operation_code: bytes) -> None: call_index: Optional[int] = None @@ -1427,7 +1451,7 @@ class AgProtocol(utils.EventEmitter): # Real three-way calls have more complicated situations, but this is not a popular issue - let users to handle the remaining :) self.send_ok() - self.emit('call_hold', operation, call_index) + self.emit(self.EVENT_CALL_HOLD, operation, call_index) def _on_chld_test(self) -> None: if not self.supports_ag_feature(AgFeature.THREE_WAY_CALLING): @@ -1553,7 +1577,7 @@ class AgProtocol(utils.EventEmitter): return self.hf_indicators[index].current_status = int(value_bytes) - self.emit('hf_indicator', self.hf_indicators[index]) + self.emit(self.EVENT_HF_INDICATOR, self.hf_indicators[index]) self.send_ok() def _on_bia(self, *args) -> None: @@ -1562,21 +1586,21 @@ class AgProtocol(utils.EventEmitter): self.send_ok() def _on_bcc(self) -> None: - self.emit('codec_connection_request') + self.emit(self.EVENT_CODEC_CONNECTION_REQUEST) self.send_ok() def _on_a(self) -> None: """ATA handler.""" - self.emit('answer') + self.emit(self.EVENT_ANSWER) self.send_ok() def _on_d(self, number: bytes) -> None: """ATD handler.""" - self.emit('dial', number.decode()) + self.emit(self.EVENT_DIAL, number.decode()) self.send_ok() def _on_chup(self) -> None: - self.emit('hang_up') + self.emit(self.EVENT_HANG_UP) self.send_ok() def _on_clcc(self) -> None: @@ -1602,11 +1626,11 @@ class AgProtocol(utils.EventEmitter): self.send_ok() def _on_vgs(self, level: bytes) -> None: - self.emit('speaker_volume', int(level)) + self.emit(self.EVENT_SPEAKER_VOLUME, int(level)) self.send_ok() def _on_vgm(self, level: bytes) -> None: - self.emit('microphone_volume', int(level)) + self.emit(self.EVENT_MICROPHONE_VOLUME, int(level)) self.send_ok() diff --git a/bumble/hid.py b/bumble/hid.py index daaeb35..edcfc98 100644 --- a/bumble/hid.py +++ b/bumble/hid.py @@ -201,6 +201,13 @@ class HID(ABC, utils.EventEmitter): l2cap_intr_channel: Optional[l2cap.ClassicChannel] = None connection: Optional[device.Connection] = None + EVENT_INTERRUPT_DATA = "interrupt_data" + EVENT_CONTROL_DATA = "control_data" + EVENT_SUSPEND = "suspend" + EVENT_EXIT_SUSPEND = "exit_suspend" + EVENT_VIRTUAL_CABLE_UNPLUG = "virtual_cable_unplug" + EVENT_HANDSHAKE = "handshake" + class Role(enum.IntEnum): HOST = 0x00 DEVICE = 0x01 @@ -215,7 +222,7 @@ class HID(ABC, utils.EventEmitter): device.register_l2cap_server(HID_CONTROL_PSM, self.on_l2cap_connection) device.register_l2cap_server(HID_INTERRUPT_PSM, self.on_l2cap_connection) - device.on('connection', self.on_device_connection) + device.on(device.EVENT_CONNECTION, self.on_device_connection) async def connect_control_channel(self) -> None: # Create a new L2CAP connection - control channel @@ -258,15 +265,20 @@ class HID(ABC, utils.EventEmitter): def on_device_connection(self, connection: device.Connection) -> None: self.connection = connection self.remote_device_bd_address = connection.peer_address - connection.on('disconnection', self.on_device_disconnection) + connection.on(connection.EVENT_DISCONNECTION, self.on_device_disconnection) def on_device_disconnection(self, reason: int) -> None: self.connection = None def on_l2cap_connection(self, l2cap_channel: l2cap.ClassicChannel) -> None: logger.debug(f'+++ New L2CAP connection: {l2cap_channel}') - l2cap_channel.on('open', lambda: self.on_l2cap_channel_open(l2cap_channel)) - l2cap_channel.on('close', lambda: self.on_l2cap_channel_close(l2cap_channel)) + l2cap_channel.on( + l2cap_channel.EVENT_OPEN, lambda: self.on_l2cap_channel_open(l2cap_channel) + ) + l2cap_channel.on( + l2cap_channel.EVENT_CLOSE, + lambda: self.on_l2cap_channel_close(l2cap_channel), + ) def on_l2cap_channel_open(self, l2cap_channel: l2cap.ClassicChannel) -> None: if l2cap_channel.psm == HID_CONTROL_PSM: @@ -290,7 +302,7 @@ class HID(ABC, utils.EventEmitter): def on_intr_pdu(self, pdu: bytes) -> None: logger.debug(f'<<< HID INTERRUPT PDU: {pdu.hex()}') - self.emit("interrupt_data", pdu) + self.emit(self.EVENT_INTERRUPT_DATA, pdu) def send_pdu_on_ctrl(self, msg: bytes) -> None: assert self.l2cap_ctrl_channel @@ -363,17 +375,17 @@ class Device(HID): self.handle_set_protocol(pdu) elif message_type == Message.MessageType.DATA: logger.debug('<<< HID CONTROL DATA') - self.emit('control_data', pdu) + self.emit(self.EVENT_CONTROL_DATA, pdu) elif message_type == Message.MessageType.CONTROL: if param == Message.ControlCommand.SUSPEND: logger.debug('<<< HID SUSPEND') - self.emit('suspend') + self.emit(self.EVENT_SUSPEND) elif param == Message.ControlCommand.EXIT_SUSPEND: logger.debug('<<< HID EXIT SUSPEND') - self.emit('exit_suspend') + self.emit(self.EVENT_EXIT_SUSPEND) elif param == Message.ControlCommand.VIRTUAL_CABLE_UNPLUG: logger.debug('<<< HID VIRTUAL CABLE UNPLUG') - self.emit('virtual_cable_unplug') + self.emit(self.EVENT_VIRTUAL_CABLE_UNPLUG) else: logger.debug('<<< HID CONTROL OPERATION UNSUPPORTED') else: @@ -538,14 +550,14 @@ class Host(HID): message_type = pdu[0] >> 4 if message_type == Message.MessageType.HANDSHAKE: logger.debug(f'<<< HID HANDSHAKE: {Message.Handshake(param).name}') - self.emit('handshake', Message.Handshake(param)) + self.emit(self.EVENT_HANDSHAKE, Message.Handshake(param)) elif message_type == Message.MessageType.DATA: logger.debug('<<< HID CONTROL DATA') - self.emit('control_data', pdu) + self.emit(self.EVENT_CONTROL_DATA, pdu) elif message_type == Message.MessageType.CONTROL: if param == Message.ControlCommand.VIRTUAL_CABLE_UNPLUG: logger.debug('<<< HID VIRTUAL CABLE UNPLUG') - self.emit('virtual_cable_unplug') + self.emit(self.EVENT_VIRTUAL_CABLE_UNPLUG) else: logger.debug('<<< HID CONTROL OPERATION UNSUPPORTED') else: diff --git a/bumble/l2cap.py b/bumble/l2cap.py index c8bbabe..08d8edf 100644 --- a/bumble/l2cap.py +++ b/bumble/l2cap.py @@ -744,6 +744,9 @@ class ClassicChannel(utils.EventEmitter): WAIT_FINAL_RSP = 0x16 WAIT_CONTROL_IND = 0x17 + EVENT_OPEN = "open" + EVENT_CLOSE = "close" + connection_result: Optional[asyncio.Future[None]] disconnection_result: Optional[asyncio.Future[None]] response: Optional[asyncio.Future[bytes]] @@ -847,7 +850,7 @@ class ClassicChannel(utils.EventEmitter): def abort(self) -> None: if self.state == self.State.OPEN: self._change_state(self.State.CLOSED) - self.emit('close') + self.emit(self.EVENT_CLOSE) def send_configure_request(self) -> None: options = L2CAP_Control_Frame.encode_configuration_options( @@ -940,7 +943,7 @@ class ClassicChannel(utils.EventEmitter): if self.connection_result: self.connection_result.set_result(None) self.connection_result = None - self.emit('open') + self.emit(self.EVENT_OPEN) elif self.state == self.State.WAIT_CONFIG_REQ_RSP: self._change_state(self.State.WAIT_CONFIG_RSP) @@ -956,7 +959,7 @@ class ClassicChannel(utils.EventEmitter): if self.connection_result: self.connection_result.set_result(None) self.connection_result = None - self.emit('open') + self.emit(self.EVENT_OPEN) else: logger.warning(color('invalid state', 'red')) elif ( @@ -991,7 +994,7 @@ class ClassicChannel(utils.EventEmitter): ) ) self._change_state(self.State.CLOSED) - self.emit('close') + self.emit(self.EVENT_CLOSE) self.manager.on_channel_closed(self) else: logger.warning(color('invalid state', 'red')) @@ -1012,7 +1015,7 @@ class ClassicChannel(utils.EventEmitter): if self.disconnection_result: self.disconnection_result.set_result(None) self.disconnection_result = None - self.emit('close') + self.emit(self.EVENT_CLOSE) self.manager.on_channel_closed(self) def __str__(self) -> str: @@ -1047,6 +1050,9 @@ class LeCreditBasedChannel(utils.EventEmitter): connection: Connection sink: Optional[Callable[[bytes], Any]] + EVENT_OPEN = "open" + EVENT_CLOSE = "close" + def __init__( self, manager: ChannelManager, @@ -1098,9 +1104,9 @@ class LeCreditBasedChannel(utils.EventEmitter): self.state = new_state if new_state == self.State.CONNECTED: - self.emit('open') + self.emit(self.EVENT_OPEN) elif new_state == self.State.DISCONNECTED: - self.emit('close') + self.emit(self.EVENT_CLOSE) def send_pdu(self, pdu: Union[SupportsBytes, bytes]) -> None: self.manager.send_pdu(self.connection, self.destination_cid, pdu) @@ -1381,6 +1387,8 @@ class LeCreditBasedChannel(utils.EventEmitter): # ----------------------------------------------------------------------------- class ClassicChannelServer(utils.EventEmitter): + EVENT_CONNECTION = "connection" + def __init__( self, manager: ChannelManager, @@ -1395,7 +1403,7 @@ class ClassicChannelServer(utils.EventEmitter): self.mtu = mtu def on_connection(self, channel: ClassicChannel) -> None: - self.emit('connection', channel) + self.emit(self.EVENT_CONNECTION, channel) if self.handler: self.handler(channel) @@ -1406,6 +1414,8 @@ class ClassicChannelServer(utils.EventEmitter): # ----------------------------------------------------------------------------- class LeCreditBasedChannelServer(utils.EventEmitter): + EVENT_CONNECTION = "connection" + def __init__( self, manager: ChannelManager, @@ -1424,7 +1434,7 @@ class LeCreditBasedChannelServer(utils.EventEmitter): self.mps = mps def on_connection(self, channel: LeCreditBasedChannel) -> None: - self.emit('connection', channel) + self.emit(self.EVENT_CONNECTION, channel) if self.handler: self.handler(channel) diff --git a/bumble/pandora/host.py b/bumble/pandora/host.py index 0390170..515c1d5 100644 --- a/bumble/pandora/host.py +++ b/bumble/pandora/host.py @@ -296,12 +296,12 @@ class HostService(HostServicer): def on_disconnection(_: None) -> None: disconnection_future.set_result(None) - connection.on('disconnection', on_disconnection) + connection.on(connection.EVENT_DISCONNECTION, on_disconnection) try: await disconnection_future self.log.debug("Disconnected") finally: - connection.remove_listener('disconnection', on_disconnection) # type: ignore + connection.remove_listener(connection.EVENT_DISCONNECTION, on_disconnection) # type: ignore return empty_pb2.Empty() @@ -383,7 +383,7 @@ class HostService(HostServicer): ): connections.put_nowait(connection) - self.device.on('connection', on_connection) + self.device.on(self.device.EVENT_CONNECTION, on_connection) try: # Advertise until RPC is canceled @@ -501,7 +501,7 @@ class HostService(HostServicer): ): connections.put_nowait(connection) - self.device.on('connection', on_connection) + self.device.on(self.device.EVENT_CONNECTION, on_connection) try: while True: @@ -531,7 +531,7 @@ class HostService(HostServicer): await asyncio.sleep(1) finally: if request.connectable: - self.device.remove_listener('connection', on_connection) # type: ignore + self.device.remove_listener(self.device.EVENT_CONNECTION, on_connection) # type: ignore try: self.log.debug('Stop advertising') @@ -557,7 +557,7 @@ class HostService(HostServicer): scanning_phys = [int(Phy.LE_1M), int(Phy.LE_CODED)] scan_queue: asyncio.Queue[Advertisement] = asyncio.Queue() - handler = self.device.on('advertisement', scan_queue.put_nowait) + handler = self.device.on(self.device.EVENT_ADVERTISEMENT, scan_queue.put_nowait) await self.device.start_scanning( legacy=request.legacy, active=not request.passive, @@ -602,7 +602,7 @@ class HostService(HostServicer): yield sr finally: - self.device.remove_listener('advertisement', handler) # type: ignore + self.device.remove_listener(self.device.EVENT_ADVERTISEMENT, handler) # type: ignore try: self.log.debug('Stop scanning') await bumble.utils.cancel_on_event( @@ -621,10 +621,10 @@ class HostService(HostServicer): Optional[Tuple[Address, int, AdvertisingData, int]] ] = asyncio.Queue() complete_handler = self.device.on( - 'inquiry_complete', lambda: inquiry_queue.put_nowait(None) + self.device.EVENT_INQUIRY_COMPLETE, lambda: inquiry_queue.put_nowait(None) ) result_handler = self.device.on( # type: ignore - 'inquiry_result', + self.device.EVENT_INQUIRY_RESULT, lambda address, class_of_device, eir_data, rssi: inquiry_queue.put_nowait( # type: ignore (address, class_of_device, eir_data, rssi) # type: ignore ), @@ -643,8 +643,8 @@ class HostService(HostServicer): ) finally: - self.device.remove_listener('inquiry_complete', complete_handler) # type: ignore - self.device.remove_listener('inquiry_result', result_handler) # type: ignore + self.device.remove_listener(self.device.EVENT_INQUIRY_COMPLETE, complete_handler) # type: ignore + self.device.remove_listener(self.device.EVENT_INQUIRY_RESULT, result_handler) # type: ignore try: self.log.debug('Stop inquiry') await bumble.utils.cancel_on_event( diff --git a/bumble/pandora/l2cap.py b/bumble/pandora/l2cap.py index df52b50..94b042c 100644 --- a/bumble/pandora/l2cap.py +++ b/bumble/pandora/l2cap.py @@ -83,7 +83,7 @@ class L2CAPService(L2CAPServicer): close_future.set_result(None) l2cap_channel.sink = on_channel_sdu - l2cap_channel.on('close', on_close) + l2cap_channel.on(l2cap_channel.EVENT_CLOSE, on_close) return ChannelContext(close_future, sdu_queue) @@ -151,7 +151,7 @@ class L2CAPService(L2CAPServicer): spec=spec, handler=on_l2cap_channel ) else: - l2cap_server.on('connection', on_l2cap_channel) + l2cap_server.on(l2cap_server.EVENT_CONNECTION, on_l2cap_channel) try: self.log.debug('Waiting for a channel connection.') diff --git a/bumble/pandora/security.py b/bumble/pandora/security.py index 947587e..ca3102d 100644 --- a/bumble/pandora/security.py +++ b/bumble/pandora/security.py @@ -302,15 +302,15 @@ class SecurityService(SecurityServicer): with contextlib.closing(bumble.utils.EventWatcher()) as watcher: - @watcher.on(connection, 'pairing') + @watcher.on(connection, connection.EVENT_PAIRING) def on_pairing(*_: Any) -> None: security_result.set_result('success') - @watcher.on(connection, 'pairing_failure') + @watcher.on(connection, connection.EVENT_PAIRING_FAILURE) def on_pairing_failure(*_: Any) -> None: security_result.set_result('pairing_failure') - @watcher.on(connection, 'disconnection') + @watcher.on(connection, connection.EVENT_DISCONNECTION) def on_disconnection(*_: Any) -> None: security_result.set_result('connection_died') diff --git a/bumble/profiles/ancs.py b/bumble/profiles/ancs.py index 8ea5aaa..6bf93f1 100644 --- a/bumble/profiles/ancs.py +++ b/bumble/profiles/ancs.py @@ -250,6 +250,8 @@ class AncsClient(utils.EventEmitter): _expected_response_tuples: int _response_accumulator: bytes + EVENT_NOTIFICATION = "notification" + def __init__(self, ancs_proxy: AncsProxy) -> None: super().__init__() self._ancs_proxy = ancs_proxy @@ -284,7 +286,7 @@ class AncsClient(utils.EventEmitter): def _on_notification(self, notification: Notification) -> None: logger.debug(f"ANCS NOTIFICATION: {notification}") - self.emit("notification", notification) + self.emit(self.EVENT_NOTIFICATION, notification) def _on_data(self, data: bytes) -> None: logger.debug(f"ANCS DATA: {data.hex()}") diff --git a/bumble/profiles/ascs.py b/bumble/profiles/ascs.py index cab70da..2b00160 100644 --- a/bumble/profiles/ascs.py +++ b/bumble/profiles/ascs.py @@ -276,6 +276,8 @@ class AseStateMachine(gatt.Characteristic): DISABLING = 0x05 RELEASING = 0x06 + EVENT_STATE_CHANGE = "state_change" + cis_link: Optional[device.CisLink] = None # Additional parameters in CODEC_CONFIGURED State @@ -329,8 +331,12 @@ class AseStateMachine(gatt.Characteristic): value=gatt.CharacteristicValue(read=self.on_read), ) - self.service.device.on('cis_request', self.on_cis_request) - self.service.device.on('cis_establishment', self.on_cis_establishment) + self.service.device.on( + self.service.device.EVENT_CIS_REQUEST, self.on_cis_request + ) + self.service.device.on( + self.service.device.EVENT_CIS_ESTABLISHMENT, self.on_cis_establishment + ) def on_cis_request( self, @@ -356,7 +362,7 @@ class AseStateMachine(gatt.Characteristic): and cis_link.cis_id == self.cis_id and self.state == self.State.ENABLING ): - cis_link.on('disconnection', self.on_cis_disconnection) + cis_link.on(cis_link.EVENT_DISCONNECTION, self.on_cis_disconnection) async def post_cis_established(): await cis_link.setup_data_path(direction=self.role) @@ -525,7 +531,7 @@ class AseStateMachine(gatt.Characteristic): def state(self, new_state: State) -> None: logger.debug(f'{self} state change -> {colors.color(new_state.name, "cyan")}') self._state = new_state - self.emit('state_change') + self.emit(self.EVENT_STATE_CHANGE) @property def value(self): diff --git a/bumble/profiles/asha.py b/bumble/profiles/asha.py index 7422d53..9166802 100644 --- a/bumble/profiles/asha.py +++ b/bumble/profiles/asha.py @@ -88,6 +88,11 @@ class AudioStatus(utils.OpenIntEnum): class AshaService(gatt.TemplateService): UUID = gatt.GATT_ASHA_SERVICE + EVENT_STARTED = "started" + EVENT_STOPPED = "stopped" + EVENT_DISCONNECTED = "disconnected" + EVENT_VOLUME_CHANGED = "volume_changed" + audio_sink: Optional[Callable[[bytes], Any]] active_codec: Optional[Codec] = None audio_type: Optional[AudioType] = None @@ -211,14 +216,14 @@ class AshaService(gatt.TemplateService): f'volume={self.volume}, ' f'other_state={self.other_state}' ) - self.emit('started') + self.emit(self.EVENT_STARTED) elif opcode == OpCode.STOP: _logger.debug('### STOP') self.active_codec = None self.audio_type = None self.volume = None self.other_state = None - self.emit('stopped') + self.emit(self.EVENT_STOPPED) elif opcode == OpCode.STATUS: _logger.debug('### STATUS: %s', PeripheralStatus(value[1]).name) @@ -231,7 +236,7 @@ class AshaService(gatt.TemplateService): self.audio_type = None self.volume = None self.other_state = None - self.emit('disconnected') + self.emit(self.EVENT_DISCONNECTED) connection.once('disconnection', on_disconnection) @@ -245,7 +250,7 @@ class AshaService(gatt.TemplateService): def _on_volume_write(self, connection: Optional[Connection], value: bytes) -> None: _logger.debug(f'--- VOLUME Write:{value[0]}') self.volume = value[0] - self.emit('volume_changed') + self.emit(self.EVENT_VOLUME_CHANGED) # Register an L2CAP CoC server def _on_connection(self, channel: l2cap.LeCreditBasedChannel) -> None: diff --git a/bumble/profiles/hap.py b/bumble/profiles/hap.py index 655cc6a..0074818 100644 --- a/bumble/profiles/hap.py +++ b/bumble/profiles/hap.py @@ -266,13 +266,13 @@ class HearingAccessService(gatt.TemplateService): # associate the lowest index as the current active preset at startup self.active_preset_index = sorted(self.preset_records.keys())[0] - @device.on('connection') # type: ignore + @device.on(device.EVENT_CONNECTION) def on_connection(connection: Connection) -> None: - @connection.on('disconnection') # type: ignore + @connection.on(connection.EVENT_DISCONNECTION) def on_disconnection(_reason) -> None: self.currently_connected_clients.remove(connection) - @connection.on('pairing') # type: ignore + @connection.on(connection.EVENT_PAIRING) def on_pairing(*_: Any) -> None: self.on_incoming_paired_connection(connection) diff --git a/bumble/profiles/mcp.py b/bumble/profiles/mcp.py index abd35e2..68cac93 100644 --- a/bumble/profiles/mcp.py +++ b/bumble/profiles/mcp.py @@ -338,6 +338,12 @@ class MediaControlServiceProxy( 'content_control_id': gatt.GATT_CONTENT_CONTROL_ID_CHARACTERISTIC, } + EVENT_MEDIA_STATE = "media_state" + EVENT_TRACK_CHANGED = "track_changed" + EVENT_TRACK_TITLE = "track_title" + EVENT_TRACK_DURATION = "track_duration" + EVENT_TRACK_POSITION = "track_position" + media_player_name: Optional[gatt_client.CharacteristicProxy[bytes]] = None media_player_icon_object_id: Optional[gatt_client.CharacteristicProxy[bytes]] = None media_player_icon_url: Optional[gatt_client.CharacteristicProxy[bytes]] = None @@ -432,20 +438,20 @@ class MediaControlServiceProxy( self.media_control_point_notifications.put_nowait(data) def _on_media_state(self, data: bytes) -> None: - self.emit('media_state', MediaState(data[0])) + self.emit(self.EVENT_MEDIA_STATE, MediaState(data[0])) def _on_track_changed(self, data: bytes) -> None: del data - self.emit('track_changed') + self.emit(self.EVENT_TRACK_CHANGED) def _on_track_title(self, data: bytes) -> None: - self.emit('track_title', data.decode("utf-8")) + self.emit(self.EVENT_TRACK_TITLE, data.decode("utf-8")) def _on_track_duration(self, data: bytes) -> None: - self.emit('track_duration', struct.unpack_from(' None: - self.emit('track_position', struct.unpack_from(' bool: old_volume = self.volume_setting diff --git a/bumble/rfcomm.py b/bumble/rfcomm.py index 1fbb24e..fe5038b 100644 --- a/bumble/rfcomm.py +++ b/bumble/rfcomm.py @@ -442,6 +442,9 @@ class RFCOMM_MCC_MSC: # ----------------------------------------------------------------------------- class DLC(utils.EventEmitter): + EVENT_OPEN = "open" + EVENT_CLOSE = "close" + class State(enum.IntEnum): INIT = 0x00 CONNECTING = 0x01 @@ -529,7 +532,7 @@ class DLC(utils.EventEmitter): self.send_frame(RFCOMM_Frame.uih(c_r=self.c_r, dlci=0, information=mcc)) self.change_state(DLC.State.CONNECTED) - self.emit('open') + self.emit(self.EVENT_OPEN) def on_ua_frame(self, _frame: RFCOMM_Frame) -> None: if self.state == DLC.State.CONNECTING: @@ -550,7 +553,7 @@ class DLC(utils.EventEmitter): self.disconnection_result.set_result(None) self.disconnection_result = None self.multiplexer.on_dlc_disconnection(self) - self.emit('close') + self.emit(self.EVENT_CLOSE) else: logger.warning( color( @@ -733,7 +736,7 @@ class DLC(utils.EventEmitter): self.disconnection_result.cancel() self.disconnection_result = None self.change_state(DLC.State.RESET) - self.emit('close') + self.emit(self.EVENT_CLOSE) def __str__(self) -> str: return ( @@ -763,6 +766,8 @@ class Multiplexer(utils.EventEmitter): DISCONNECTED = 0x05 RESET = 0x06 + EVENT_DLC = "dlc" + connection_result: Optional[asyncio.Future] disconnection_result: Optional[asyncio.Future] open_result: Optional[asyncio.Future] @@ -785,7 +790,7 @@ class Multiplexer(utils.EventEmitter): # Become a sink for the L2CAP channel l2cap_channel.sink = self.on_pdu - l2cap_channel.on('close', self.on_l2cap_channel_close) + l2cap_channel.on(l2cap_channel.EVENT_CLOSE, self.on_l2cap_channel_close) def change_state(self, new_state: State) -> None: logger.debug(f'{self} state change -> {color(new_state.name, "cyan")}') @@ -901,7 +906,7 @@ class Multiplexer(utils.EventEmitter): self.dlcs[pn.dlci] = dlc # Re-emit the handshake completion event - dlc.on('open', lambda: self.emit('dlc', dlc)) + dlc.on(dlc.EVENT_OPEN, lambda: self.emit(self.EVENT_DLC, dlc)) # Respond to complete the handshake dlc.accept() @@ -1076,6 +1081,8 @@ class Client: # ----------------------------------------------------------------------------- class Server(utils.EventEmitter): + EVENT_START = "start" + def __init__( self, device: Device, l2cap_mtu: int = RFCOMM_DEFAULT_L2CAP_MTU ) -> None: @@ -1122,7 +1129,9 @@ class Server(utils.EventEmitter): def on_connection(self, l2cap_channel: l2cap.ClassicChannel) -> None: logger.debug(f'+++ new L2CAP connection: {l2cap_channel}') - l2cap_channel.on('open', lambda: self.on_l2cap_channel_open(l2cap_channel)) + l2cap_channel.on( + l2cap_channel.EVENT_OPEN, lambda: self.on_l2cap_channel_open(l2cap_channel) + ) def on_l2cap_channel_open(self, l2cap_channel: l2cap.ClassicChannel) -> None: logger.debug(f'$$$ L2CAP channel open: {l2cap_channel}') @@ -1130,10 +1139,10 @@ class Server(utils.EventEmitter): # Create a new multiplexer for the channel multiplexer = Multiplexer(l2cap_channel, Multiplexer.Role.RESPONDER) multiplexer.acceptor = self.accept_dlc - multiplexer.on('dlc', self.on_dlc) + multiplexer.on(multiplexer.EVENT_DLC, self.on_dlc) # Notify - self.emit('start', multiplexer) + self.emit(self.EVENT_START, multiplexer) def accept_dlc(self, channel_number: int) -> Optional[Tuple[int, int]]: return self.dlc_configs.get(channel_number) diff --git a/bumble/smp.py b/bumble/smp.py index d98565c..79579d3 100644 --- a/bumble/smp.py +++ b/bumble/smp.py @@ -724,12 +724,13 @@ class Session: self.is_responder = not self.is_initiator # Listen for connection events - connection.on('disconnection', self.on_disconnection) + connection.on(connection.EVENT_DISCONNECTION, self.on_disconnection) connection.on( - 'connection_encryption_change', self.on_connection_encryption_change + connection.EVENT_CONNECTION_ENCRYPTION_CHANGE, + self.on_connection_encryption_change, ) connection.on( - 'connection_encryption_key_refresh', + connection.EVENT_CONNECTION_ENCRYPTION_KEY_REFRESH, self.on_connection_encryption_key_refresh, ) @@ -1310,12 +1311,15 @@ class Session: ) def on_disconnection(self, _: int) -> None: - self.connection.remove_listener('disconnection', self.on_disconnection) self.connection.remove_listener( - 'connection_encryption_change', self.on_connection_encryption_change + self.connection.EVENT_DISCONNECTION, self.on_disconnection ) self.connection.remove_listener( - 'connection_encryption_key_refresh', + self.connection.EVENT_CONNECTION_ENCRYPTION_CHANGE, + self.on_connection_encryption_change, + ) + self.connection.remove_listener( + self.connection.EVENT_CONNECTION_ENCRYPTION_KEY_REFRESH, self.on_connection_encryption_key_refresh, ) self.manager.on_session_end(self) @@ -1962,7 +1966,7 @@ class Manager(utils.EventEmitter): def on_smp_security_request_command( self, connection: Connection, request: SMP_Security_Request_Command ) -> None: - connection.emit('security_request', request.auth_req) + connection.emit(connection.EVENT_SECURITY_REQUEST, request.auth_req) def on_smp_pdu(self, connection: Connection, pdu: bytes) -> None: # Parse the L2CAP payload into an SMP Command object