diff --git a/apps/bench.py b/apps/bench.py index a98adc4..4832088 100644 --- a/apps/bench.py +++ b/apps/bench.py @@ -82,10 +82,11 @@ SPEED_RX_UUID = '016A2CC7-E14B-4819-935F-1F56EAE4098D' DEFAULT_RFCOMM_UUID = 'E6D55659-C8B4-4B85-96BB-B1143AF6D3AE' DEFAULT_L2CAP_PSM = 1234 DEFAULT_L2CAP_MAX_CREDITS = 128 -DEFAULT_L2CAP_MTU = 1022 -DEFAULT_L2CAP_MPS = 1024 +DEFAULT_L2CAP_MTU = 1024 +DEFAULT_L2CAP_MPS = 1022 DEFAULT_LINGER_TIME = 1.0 +DEFAULT_POST_CONNECTION_WAIT_TIME = 1.0 DEFAULT_RFCOMM_CHANNEL = 8 @@ -952,6 +953,10 @@ class Central(Connection.Listener): await self.device.power_on() + if self.classic: + await self.device.set_discoverable(False) + await self.device.set_connectable(False) + print(color(f'### Connecting to {self.peripheral_address}...', 'cyan')) try: self.connection = await self.device.connect( @@ -972,6 +977,11 @@ class Central(Connection.Listener): self.connection.listener = self print_connection(self.connection) + # Wait a bit after the connection, some controllers aren't very good when + # we start sending data right away while some connection parameters are + # updated post connection + await asyncio.sleep(DEFAULT_POST_CONNECTION_WAIT_TIME) + # Request a new data length if requested if self.extended_data_length: print(color('+++ Requesting extended data length', 'cyan')) @@ -1098,6 +1108,13 @@ class Peripheral(Device.Listener, Connection.Listener): self.connection = connection self.connected.set() + # Stop being discoverable and connectable + if self.classic: + async def stop_being_discoverable_connectable(): + await self.device.set_discoverable(False) + await self.device.set_connectable(False) + AsyncRunner.spawn(stop_being_discoverable_connectable()) + # Request a new data length if needed if self.extended_data_length: print("+++ Requesting extended data length") diff --git a/bumble/host.py b/bumble/host.py index 190ab89..052a2c7 100644 --- a/bumble/host.py +++ b/bumble/host.py @@ -21,7 +21,7 @@ import collections import logging import struct -from typing import Any, Awaitable, Callable, Dict, Optional, Union, cast, TYPE_CHECKING +from typing import Any, Awaitable, Callable, Deque, Dict, Optional, cast, TYPE_CHECKING from bumble.colors import color from bumble.l2cap import L2CAP_PDU @@ -91,16 +91,49 @@ logger = logging.getLogger(__name__) # ----------------------------------------------------------------------------- -# Constants -# ----------------------------------------------------------------------------- -# fmt: off +class AclPacketQueue: + max_packet_size: int -HOST_DEFAULT_HC_LE_ACL_DATA_PACKET_LENGTH = 27 -HOST_HC_TOTAL_NUM_LE_ACL_DATA_PACKETS = 1 -HOST_DEFAULT_HC_ACL_DATA_PACKET_LENGTH = 27 -HOST_HC_TOTAL_NUM_ACL_DATA_PACKETS = 1 + def __init__( + self, + max_packet_size: int, + max_in_flight: int, + send: Callable[[HCI_Packet], None], + ) -> None: + self.max_packet_size = max_packet_size + self.max_in_flight = max_in_flight + self.in_flight = 0 + self.send = send + self.packets: Deque[HCI_AclDataPacket] = collections.deque() -# fmt: on + def enqueue(self, packet: HCI_AclDataPacket) -> None: + self.packets.appendleft(packet) + self.check_queue() + + if len(self.packets): + logger.debug( + f'{self.in_flight} ACL packets in flight, ' + f'{len(self.packets)} in queue' + ) + + def check_queue(self) -> None: + while self.packets and self.in_flight < self.max_in_flight: + packet = self.packets.pop() + self.send(packet) + self.in_flight += 1 + + def on_packets_completed(self, packet_count: int) -> None: + if packet_count > self.in_flight: + logger.warning( + color( + '!!! {packet_count} completed but only ' + f'{self.in_flight} in flight' + ) + ) + packet_count = self.in_flight + + self.in_flight -= packet_count + self.check_queue() # ----------------------------------------------------------------------------- @@ -111,6 +144,13 @@ class Connection: self.peer_address = peer_address self.assembler = HCI_AclDataPacketAssembler(self.on_acl_pdu) self.transport = transport + acl_packet_queue: Optional[AclPacketQueue] = ( + host.le_acl_packet_queue + if transport == BT_LE_TRANSPORT + else host.acl_packet_queue + ) + assert acl_packet_queue + self.acl_packet_queue = acl_packet_queue def on_hci_acl_data_packet(self, packet: HCI_AclDataPacket) -> None: self.assembler.feed_packet(packet) @@ -123,7 +163,8 @@ class Connection: # ----------------------------------------------------------------------------- class Host(AbortableEventEmitter): connections: Dict[int, Connection] - acl_packet_queue: collections.deque[HCI_AclDataPacket] + acl_packet_queue: Optional[AclPacketQueue] = None + le_acl_packet_queue: Optional[AclPacketQueue] = None hci_sink: Optional[TransportSink] = None hci_metadata: Dict[str, Any] long_term_key_provider: Optional[ @@ -143,12 +184,6 @@ class Host(AbortableEventEmitter): self.connections = {} # Connections, by connection handle self.pending_command = None self.pending_response = None - self.hc_le_acl_data_packet_length = HOST_DEFAULT_HC_LE_ACL_DATA_PACKET_LENGTH - self.hc_total_num_le_acl_data_packets = HOST_HC_TOTAL_NUM_LE_ACL_DATA_PACKETS - self.hc_acl_data_packet_length = HOST_DEFAULT_HC_ACL_DATA_PACKET_LENGTH - self.hc_total_num_acl_data_packets = HOST_HC_TOTAL_NUM_ACL_DATA_PACKETS - self.acl_packet_queue = collections.deque() - self.acl_packets_in_flight = 0 self.local_version = None self.local_supported_commands = bytes(64) self.local_le_features = 0 @@ -254,46 +289,57 @@ class Host(AbortableEventEmitter): response = await self.send_command( HCI_Read_Buffer_Size_Command(), check_result=True ) - self.hc_acl_data_packet_length = ( + hc_acl_data_packet_length = ( response.return_parameters.hc_acl_data_packet_length ) - self.hc_total_num_acl_data_packets = ( + hc_total_num_acl_data_packets = ( response.return_parameters.hc_total_num_acl_data_packets ) logger.debug( 'HCI ACL flow control: ' - f'hc_acl_data_packet_length={self.hc_acl_data_packet_length},' - f'hc_total_num_acl_data_packets={self.hc_total_num_acl_data_packets}' + f'hc_acl_data_packet_length={hc_acl_data_packet_length},' + f'hc_total_num_acl_data_packets={hc_total_num_acl_data_packets}' ) + self.acl_packet_queue = AclPacketQueue( + max_packet_size=hc_acl_data_packet_length, + max_in_flight=hc_total_num_acl_data_packets, + send=self.send_hci_packet, + ) + + hc_le_acl_data_packet_length = 0 + hc_total_num_le_acl_data_packets = 0 if self.supports_command(HCI_LE_READ_BUFFER_SIZE_COMMAND): response = await self.send_command( HCI_LE_Read_Buffer_Size_Command(), check_result=True ) - self.hc_le_acl_data_packet_length = ( + hc_le_acl_data_packet_length = ( response.return_parameters.hc_le_acl_data_packet_length ) - self.hc_total_num_le_acl_data_packets = ( + hc_total_num_le_acl_data_packets = ( response.return_parameters.hc_total_num_le_acl_data_packets ) logger.debug( 'HCI LE ACL flow control: ' - f'hc_le_acl_data_packet_length={self.hc_le_acl_data_packet_length},' - 'hc_total_num_le_acl_data_packets=' - f'{self.hc_total_num_le_acl_data_packets}' + f'hc_le_acl_data_packet_length={hc_le_acl_data_packet_length},' + f'hc_total_num_le_acl_data_packets={hc_total_num_le_acl_data_packets}' ) - if ( - response.return_parameters.hc_le_acl_data_packet_length == 0 - or response.return_parameters.hc_total_num_le_acl_data_packets == 0 - ): - # LE and Classic share the same values - self.hc_le_acl_data_packet_length = self.hc_acl_data_packet_length - self.hc_total_num_le_acl_data_packets = ( - self.hc_total_num_acl_data_packets - ) + if ( + hc_le_acl_data_packet_length == 0 + or hc_total_num_le_acl_data_packets == 0 + ): + # LE and Classic share the same queue + self.le_acl_packet_queue = self.acl_packet_queue + else: + # Create a separate queue for LE + self.le_acl_packet_queue = AclPacketQueue( + max_packet_size=hc_le_acl_data_packet_length, + max_in_flight=hc_total_num_le_acl_data_packets, + send=self.send_hci_packet, + ) if self.supports_command( HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND @@ -332,14 +378,13 @@ class Host(AbortableEventEmitter): self.hci_metadata = getattr(source, 'metadata', self.hci_metadata) def send_hci_packet(self, packet: HCI_Packet) -> None: + logger.debug(f'{color("### HOST -> CONTROLLER", "blue")}: {packet}') if self.snooper: self.snooper.snoop(bytes(packet), Snooper.Direction.HOST_TO_CONTROLLER) if self.hci_sink: self.hci_sink.on_packet(bytes(packet)) async def send_command(self, command, check_result=False): - logger.debug(f'{color("### HOST -> CONTROLLER", "blue")}: {command}') - # Wait until we can send (only one pending command at a time) async with self.command_semaphore: assert self.pending_command is None @@ -387,6 +432,17 @@ class Host(AbortableEventEmitter): asyncio.create_task(send_command(command)) def send_l2cap_pdu(self, connection_handle: int, cid: int, pdu: bytes) -> None: + if not (connection := self.connections.get(connection_handle)): + logger.warning(f'connection 0x{connection_handle:04X} not found') + return + packet_queue = connection.acl_packet_queue + if packet_queue is None: + logger.warning( + f'no ACL packet queue for connection 0x{connection_handle:04X}' + ) + return + + # Create a PDU l2cap_pdu = bytes(L2CAP_PDU(cid, pdu)) # Send the data to the controller via ACL packets @@ -394,8 +450,7 @@ class Host(AbortableEventEmitter): offset = 0 pb_flag = 0 while bytes_remaining: - # TODO: support different LE/Classic lengths - data_total_length = min(bytes_remaining, self.hc_le_acl_data_packet_length) + data_total_length = min(bytes_remaining, packet_queue.max_packet_size) acl_packet = HCI_AclDataPacket( connection_handle=connection_handle, pb_flag=pb_flag, @@ -403,34 +458,12 @@ class Host(AbortableEventEmitter): data_total_length=data_total_length, data=l2cap_pdu[offset : offset + data_total_length], ) - logger.debug( - f'{color("### HOST -> CONTROLLER", "blue")}: (CID={cid}) {acl_packet}' - ) - self.queue_acl_packet(acl_packet) + logger.debug(f'>>> ACL packet enqueue: (CID={cid}) {acl_packet}') + packet_queue.enqueue(acl_packet) pb_flag = 1 offset += data_total_length bytes_remaining -= data_total_length - def queue_acl_packet(self, acl_packet: HCI_AclDataPacket) -> None: - self.acl_packet_queue.appendleft(acl_packet) - self.check_acl_packet_queue() - - if len(self.acl_packet_queue): - logger.debug( - f'{self.acl_packets_in_flight} ACL packets in flight, ' - f'{len(self.acl_packet_queue)} in queue' - ) - - def check_acl_packet_queue(self) -> None: - # Send all we can (TODO: support different LE/Classic limits) - while ( - len(self.acl_packet_queue) > 0 - and self.acl_packets_in_flight < self.hc_total_num_le_acl_data_packets - ): - packet = self.acl_packet_queue.pop() - self.send_hci_packet(packet) - self.acl_packets_in_flight += 1 - def supports_command(self, command): # Find the support flag position for this command for octet, flags in enumerate(HCI_SUPPORTED_COMMANDS_FLAGS): @@ -553,7 +586,7 @@ class Host(AbortableEventEmitter): # This is used just for the Num_HCI_Command_Packets field, not related to # an actual command logger.debug('no-command event') - return None + return return self.on_command_processed(event) @@ -561,18 +594,17 @@ class Host(AbortableEventEmitter): return self.on_command_processed(event) def on_hci_number_of_completed_packets_event(self, event): - total_packets = sum(event.num_completed_packets) - if total_packets <= self.acl_packets_in_flight: - self.acl_packets_in_flight -= total_packets - self.check_acl_packet_queue() - else: - logger.warning( - color( - '!!! {total_packets} completed but only ' - f'{self.acl_packets_in_flight} in flight' + for i, connection_handle in enumerate(event.connection_handles): + if not (connection := self.connections.get(connection_handle)): + logger.warning( + 'received packet completion event for unknown handle ' + f'0x{connection_handle:04X}' ) + continue + + connection.acl_packet_queue.on_packets_completed( + event.num_completed_packets[i] ) - self.acl_packets_in_flight = 0 # Classic only def on_hci_connection_request_event(self, event): diff --git a/bumble/l2cap.py b/bumble/l2cap.py index ce3385d..7dc45d8 100644 --- a/bumble/l2cap.py +++ b/bumble/l2cap.py @@ -151,8 +151,8 @@ L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_CREDITS = 65535 L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MTU = 23 L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MPS = 23 L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MPS = 65533 -L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU = 2046 -L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS = 2048 +L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU = 2048 +L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS = 2046 L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_INITIAL_CREDITS = 256 L2CAP_MAXIMUM_TRANSMISSION_UNIT_CONFIGURATION_OPTION_TYPE = 0x01 diff --git a/bumble/rfcomm.py b/bumble/rfcomm.py index 90b28af..09bc2f2 100644 --- a/bumble/rfcomm.py +++ b/bumble/rfcomm.py @@ -118,8 +118,8 @@ CRC_TABLE = bytes([ 0XBA, 0X2B, 0X59, 0XC8, 0XBD, 0X2C, 0X5E, 0XCF ]) -RFCOMM_DEFAULT_INITIAL_RX_CREDITS = 7 -RFCOMM_DEFAULT_PREFERRED_MTU = 1280 +RFCOMM_DEFAULT_WINDOW_SIZE = 16 +RFCOMM_DEFAULT_MAX_FRAME_SIZE = 2000 RFCOMM_DYNAMIC_CHANNEL_NUMBER_START = 1 RFCOMM_DYNAMIC_CHANNEL_NUMBER_END = 30 @@ -438,14 +438,16 @@ class DLC(EventEmitter): multiplexer: Multiplexer, dlci: int, max_frame_size: int, - initial_tx_credits: int, + window_size: int, ) -> None: super().__init__() self.multiplexer = multiplexer self.dlci = dlci - self.rx_credits = RFCOMM_DEFAULT_INITIAL_RX_CREDITS - self.rx_threshold = self.rx_credits // 2 - self.tx_credits = initial_tx_credits + self.max_frame_size = max_frame_size + self.window_size = window_size + self.rx_credits = window_size + self.rx_threshold = window_size // 2 + self.tx_credits = window_size self.tx_buffer = b'' self.state = DLC.State.INIT self.role = multiplexer.role @@ -537,11 +539,11 @@ class DLC(EventEmitter): if len(data) and self.sink: self.sink(data) # pylint: disable=not-callable - # Update the credits - if self.rx_credits > 0: - self.rx_credits -= 1 - else: - logger.warning(color('!!! received frame with no rx credits', 'red')) + # Update the credits + if self.rx_credits > 0: + self.rx_credits -= 1 + else: + logger.warning(color('!!! received frame with no rx credits', 'red')) # Check if there's anything to send (including credits) self.process_tx() @@ -580,9 +582,9 @@ class DLC(EventEmitter): cl=0xE0, priority=7, ack_timer=0, - max_frame_size=RFCOMM_DEFAULT_PREFERRED_MTU, + max_frame_size=self.max_frame_size, max_retransmissions=0, - window_size=RFCOMM_DEFAULT_INITIAL_RX_CREDITS, + window_size=self.window_size, ) mcc = RFCOMM_Frame.make_mcc(mcc_type=RFCOMM_MCC_PN_TYPE, c_r=0, data=bytes(pn)) logger.debug(f'>>> PN Response: {pn}') @@ -591,7 +593,7 @@ class DLC(EventEmitter): def rx_credits_needed(self) -> int: if self.rx_credits <= self.rx_threshold: - return RFCOMM_DEFAULT_INITIAL_RX_CREDITS - self.rx_credits + return self.window_size - self.rx_credits return 0 @@ -843,7 +845,12 @@ class Multiplexer(EventEmitter): ) await self.disconnection_result - async def open_dlc(self, channel: int) -> DLC: + async def open_dlc( + self, + channel: int, + max_frame_size: int = RFCOMM_DEFAULT_MAX_FRAME_SIZE, + window_size: int = RFCOMM_DEFAULT_WINDOW_SIZE, + ) -> DLC: if self.state != Multiplexer.State.CONNECTED: if self.state == Multiplexer.State.OPENING: raise InvalidStateError('open already in progress') @@ -855,9 +862,9 @@ class Multiplexer(EventEmitter): cl=0xF0, priority=7, ack_timer=0, - max_frame_size=RFCOMM_DEFAULT_PREFERRED_MTU, + max_frame_size=max_frame_size, max_retransmissions=0, - window_size=RFCOMM_DEFAULT_INITIAL_RX_CREDITS, + window_size=window_size, ) mcc = RFCOMM_Frame.make_mcc(mcc_type=RFCOMM_MCC_PN_TYPE, c_r=1, data=bytes(pn)) logger.debug(f'>>> Sending MCC: {pn}') diff --git a/bumble/transport/usb.py b/bumble/transport/usb.py index d48b239..1257260 100644 --- a/bumble/transport/usb.py +++ b/bumble/transport/usb.py @@ -108,7 +108,7 @@ async def open_usb_transport(spec: str) -> Transport: USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER, ) - READ_SIZE = 1024 + READ_SIZE = 4096 class UsbPacketSink: def __init__(self, device, acl_out):