Compare commits

...

2 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod
623298b0e9 emit flush event when transport lost 2023-08-18 09:59:15 -07:00
Gilles Boccon-Gibod
2bfec3c4ed add sink method for lost transports 2023-08-12 10:54:20 -07:00
4 changed files with 53 additions and 24 deletions

View File

@@ -188,6 +188,8 @@ class Controller:
if link:
link.add_controller(self)
self.terminated = asyncio.get_running_loop().create_future()
@property
def host(self):
return self.hci_sink
@@ -288,10 +290,9 @@ class Controller:
if self.host:
self.host.on_packet(packet.to_bytes())
# This method allow the controller to emulate the same API as a transport source
# This method allows the controller to emulate the same API as a transport source
async def wait_for_termination(self):
# For now, just wait forever
await asyncio.get_running_loop().create_future()
await self.terminated
############################################################
# Link connections

View File

@@ -20,13 +20,13 @@ import collections
import logging
import struct
from typing import Optional
from bumble.colors import color
from bumble.l2cap import L2CAP_PDU
from bumble.snoop import Snooper
from bumble import drivers
from typing import Optional
from .hci import (
Address,
HCI_ACL_DATA_PACKET,
@@ -63,16 +63,15 @@ from .hci import (
HCI_Read_Local_Version_Information_Command,
HCI_Reset_Command,
HCI_Set_Event_Mask_Command,
map_null_terminated_utf8_string,
)
from .core import (
BT_BR_EDR_TRANSPORT,
BT_CENTRAL_ROLE,
BT_LE_TRANSPORT,
ConnectionPHY,
ConnectionParameters,
)
from .utils import AbortableEventEmitter
from .transport.common import TransportLostError
# -----------------------------------------------------------------------------
@@ -349,7 +348,7 @@ class Host(AbortableEventEmitter):
return response
except Exception as error:
logger.warning(
f'{color("!!! Exception while sending HCI packet:", "red")} {error}'
f'{color("!!! Exception while sending command:", "red")} {error}'
)
raise error
finally:
@@ -455,6 +454,13 @@ class Host(AbortableEventEmitter):
else:
logger.debug('reset not done, ignoring packet from controller')
def on_transport_lost(self):
# Called by the source when the transport has been lost.
if self.pending_response:
self.pending_response.set_exception(TransportLostError('transport lost'))
self.emit('flush')
def on_hci_packet(self, packet):
logger.debug(f'{color("### CONTROLLER -> HOST", "green")}: {packet}')

View File

@@ -44,11 +44,18 @@ HCI_PACKET_INFO = {
}
# -----------------------------------------------------------------------------
class TransportLostError(Exception):
"""
The Transport has been lost/disconnected.
"""
# -----------------------------------------------------------------------------
class PacketPump:
'''
Pump HCI packets from a reader to a sink
'''
"""
Pump HCI packets from a reader to a sink.
"""
def __init__(self, reader, sink):
self.reader = reader
@@ -68,10 +75,10 @@ class PacketPump:
# -----------------------------------------------------------------------------
class PacketParser:
'''
"""
In-line parser that accepts data and emits 'on_packet' when a full packet has been
parsed
'''
parsed.
"""
# pylint: disable=attribute-defined-outside-init
@@ -134,9 +141,9 @@ class PacketParser:
# -----------------------------------------------------------------------------
class PacketReader:
'''
Reader that reads HCI packets from a sync source
'''
"""
Reader that reads HCI packets from a sync source.
"""
def __init__(self, source):
self.source = source
@@ -169,9 +176,9 @@ class PacketReader:
# -----------------------------------------------------------------------------
class AsyncPacketReader:
'''
Reader that reads HCI packets from an async source
'''
"""
Reader that reads HCI packets from an async source.
"""
def __init__(self, source):
self.source = source
@@ -198,9 +205,9 @@ class AsyncPacketReader:
# -----------------------------------------------------------------------------
class AsyncPipeSink:
'''
Sink that forwards packets asynchronously to another sink
'''
"""
Sink that forwards packets asynchronously to another sink.
"""
def __init__(self, sink):
self.sink = sink
@@ -216,6 +223,9 @@ class ParserSource:
Base class designed to be subclassed by transport-specific source classes
"""
terminated: asyncio.Future
parser: PacketParser
def __init__(self):
self.parser = PacketParser()
self.terminated = asyncio.get_running_loop().create_future()
@@ -223,7 +233,19 @@ class ParserSource:
def set_packet_sink(self, sink):
self.parser.set_packet_sink(sink)
def on_transport_lost(self):
self.terminated.set_result(None)
if self.parser.sink:
try:
self.parser.sink.on_transport_lost()
except AttributeError:
pass
async def wait_for_termination(self):
"""
Convenience method for backward compatibility. Prefer using the `terminated`
attribute instead.
"""
return await self.terminated
def close(self):

View File

@@ -39,7 +39,7 @@ async def open_tcp_client_transport(spec):
class TcpPacketSource(StreamPacketSource):
def connection_lost(self, exc):
logger.debug(f'connection lost: {exc}')
self.terminated.set_result(exc)
self.on_transport_lost()
remote_host, remote_port = spec.split(':')
tcp_transport, packet_source = await asyncio.get_running_loop().create_connection(