add sink method for lost transports

This commit is contained in:
Gilles Boccon-Gibod
2023-08-12 10:54:20 -07:00
parent fe28473ba8
commit 2bfec3c4ed
4 changed files with 51 additions and 24 deletions

View File

@@ -188,6 +188,8 @@ class Controller:
if link: if link:
link.add_controller(self) link.add_controller(self)
self.terminated = asyncio.get_running_loop().create_future()
@property @property
def host(self): def host(self):
return self.hci_sink return self.hci_sink
@@ -288,10 +290,9 @@ class Controller:
if self.host: if self.host:
self.host.on_packet(packet.to_bytes()) self.host.on_packet(packet.to_bytes())
# This method allow the controller to emulate the same API as a transport source # This method allows the controller to emulate the same API as a transport source
async def wait_for_termination(self): async def wait_for_termination(self):
# For now, just wait forever await self.terminated
await asyncio.get_running_loop().create_future()
############################################################ ############################################################
# Link connections # Link connections

View File

@@ -20,13 +20,13 @@ import collections
import logging import logging
import struct import struct
from typing import Optional
from bumble.colors import color from bumble.colors import color
from bumble.l2cap import L2CAP_PDU from bumble.l2cap import L2CAP_PDU
from bumble.snoop import Snooper from bumble.snoop import Snooper
from bumble import drivers from bumble import drivers
from typing import Optional
from .hci import ( from .hci import (
Address, Address,
HCI_ACL_DATA_PACKET, HCI_ACL_DATA_PACKET,
@@ -63,16 +63,15 @@ from .hci import (
HCI_Read_Local_Version_Information_Command, HCI_Read_Local_Version_Information_Command,
HCI_Reset_Command, HCI_Reset_Command,
HCI_Set_Event_Mask_Command, HCI_Set_Event_Mask_Command,
map_null_terminated_utf8_string,
) )
from .core import ( from .core import (
BT_BR_EDR_TRANSPORT, BT_BR_EDR_TRANSPORT,
BT_CENTRAL_ROLE,
BT_LE_TRANSPORT, BT_LE_TRANSPORT,
ConnectionPHY, ConnectionPHY,
ConnectionParameters, ConnectionParameters,
) )
from .utils import AbortableEventEmitter from .utils import AbortableEventEmitter
from .transport.common import TransportLostError
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -349,7 +348,7 @@ class Host(AbortableEventEmitter):
return response return response
except Exception as error: except Exception as error:
logger.warning( logger.warning(
f'{color("!!! Exception while sending HCI packet:", "red")} {error}' f'{color("!!! Exception while sending command:", "red")} {error}'
) )
raise error raise error
finally: finally:
@@ -455,6 +454,11 @@ class Host(AbortableEventEmitter):
else: else:
logger.debug('reset not done, ignoring packet from controller') logger.debug('reset not done, ignoring packet from controller')
def on_transport_lost(self):
# Called by the source when the transport has been lost.
if self.pending_response:
self.pending_response.set_exception(TransportLostError('transport lost'))
def on_hci_packet(self, packet): def on_hci_packet(self, packet):
logger.debug(f'{color("### CONTROLLER -> HOST", "green")}: {packet}') logger.debug(f'{color("### CONTROLLER -> HOST", "green")}: {packet}')

View File

@@ -44,11 +44,18 @@ HCI_PACKET_INFO = {
} }
# -----------------------------------------------------------------------------
class TransportLostError(Exception):
"""
The Transport has been lost/disconnected.
"""
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class PacketPump: class PacketPump:
''' """
Pump HCI packets from a reader to a sink Pump HCI packets from a reader to a sink.
''' """
def __init__(self, reader, sink): def __init__(self, reader, sink):
self.reader = reader self.reader = reader
@@ -68,10 +75,10 @@ class PacketPump:
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class PacketParser: class PacketParser:
''' """
In-line parser that accepts data and emits 'on_packet' when a full packet has been In-line parser that accepts data and emits 'on_packet' when a full packet has been
parsed parsed.
''' """
# pylint: disable=attribute-defined-outside-init # pylint: disable=attribute-defined-outside-init
@@ -134,9 +141,9 @@ class PacketParser:
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class PacketReader: class PacketReader:
''' """
Reader that reads HCI packets from a sync source Reader that reads HCI packets from a sync source.
''' """
def __init__(self, source): def __init__(self, source):
self.source = source self.source = source
@@ -169,9 +176,9 @@ class PacketReader:
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class AsyncPacketReader: class AsyncPacketReader:
''' """
Reader that reads HCI packets from an async source Reader that reads HCI packets from an async source.
''' """
def __init__(self, source): def __init__(self, source):
self.source = source self.source = source
@@ -198,9 +205,9 @@ class AsyncPacketReader:
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class AsyncPipeSink: class AsyncPipeSink:
''' """
Sink that forwards packets asynchronously to another sink Sink that forwards packets asynchronously to another sink.
''' """
def __init__(self, sink): def __init__(self, sink):
self.sink = sink self.sink = sink
@@ -216,6 +223,9 @@ class ParserSource:
Base class designed to be subclassed by transport-specific source classes Base class designed to be subclassed by transport-specific source classes
""" """
terminated: asyncio.Future
parser: PacketParser
def __init__(self): def __init__(self):
self.parser = PacketParser() self.parser = PacketParser()
self.terminated = asyncio.get_running_loop().create_future() self.terminated = asyncio.get_running_loop().create_future()
@@ -223,7 +233,19 @@ class ParserSource:
def set_packet_sink(self, sink): def set_packet_sink(self, sink):
self.parser.set_packet_sink(sink) self.parser.set_packet_sink(sink)
def on_transport_lost(self):
self.terminated.set_result(None)
if self.parser.sink:
try:
self.parser.sink.on_transport_lost()
except AttributeError:
pass
async def wait_for_termination(self): async def wait_for_termination(self):
"""
Convenience method for backward compatibility. Prefer using the `terminated`
attribute instead.
"""
return await self.terminated return await self.terminated
def close(self): def close(self):

View File

@@ -39,7 +39,7 @@ async def open_tcp_client_transport(spec):
class TcpPacketSource(StreamPacketSource): class TcpPacketSource(StreamPacketSource):
def connection_lost(self, exc): def connection_lost(self, exc):
logger.debug(f'connection lost: {exc}') logger.debug(f'connection lost: {exc}')
self.terminated.set_result(exc) self.on_transport_lost()
remote_host, remote_port = spec.split(':') remote_host, remote_port = spec.split(':')
tcp_transport, packet_source = await asyncio.get_running_loop().create_connection( tcp_transport, packet_source = await asyncio.get_running_loop().create_connection(