diff --git a/apps/auracast.py b/apps/auracast.py index 56890b3..2fd930d 100644 --- a/apps/auracast.py +++ b/apps/auracast.py @@ -128,7 +128,7 @@ class BroadcastScanner(bumble.utils.EventEmitter): broadcast_audio_announcement: Optional[bap.BroadcastAudioAnnouncement] = None basic_audio_announcement: Optional[bap.BasicAudioAnnouncement] = None appearance: Optional[core.Appearance] = None - biginfo: Optional[bumble.device.BIGInfoAdvertisement] = None + biginfo: Optional[bumble.device.BigInfoAdvertisement] = None manufacturer_data: Optional[tuple[str, bytes]] = None def __post_init__(self) -> None: @@ -255,8 +255,10 @@ class BroadcastScanner(bumble.utils.EventEmitter): print(color(' SDU Interval: ', 'magenta'), self.biginfo.sdu_interval) print(color(' Max SDU: ', 'magenta'), self.biginfo.max_sdu) print(color(' PHY: ', 'magenta'), self.biginfo.phy.name) - print(color(' Framed: ', 'magenta'), self.biginfo.framed) - print(color(' Encrypted: ', 'magenta'), self.biginfo.encrypted) + print(color(' Framing: ', 'magenta'), self.biginfo.framing.name) + print( + color(' Encryption: ', 'magenta'), self.biginfo.encryption.name + ) def on_sync_establishment(self) -> None: self.emit('sync_establishment') @@ -286,7 +288,7 @@ class BroadcastScanner(bumble.utils.EventEmitter): self.emit('change') def on_biginfo_advertisement( - self, advertisement: bumble.device.BIGInfoAdvertisement + self, advertisement: bumble.device.BigInfoAdvertisement ) -> None: self.biginfo = advertisement self.emit('change') diff --git a/apps/bench.py b/apps/bench.py index 7d815d2..5941001 100644 --- a/apps/bench.py +++ b/apps/bench.py @@ -36,7 +36,15 @@ from bumble.core import ( CommandTimeoutError, ) from bumble.colors import color -from bumble.device import Connection, ConnectionParametersPreferences, Device, Peer +from bumble.core import ConnectionPHY +from bumble.device import ( + CigParameters, + CisLink, + Connection, + ConnectionParametersPreferences, + Device, + Peer, +) from bumble.gatt import Characteristic, CharacteristicValue, Service from bumble.hci import ( HCI_LE_1M_PHY, @@ -46,6 +54,7 @@ from bumble.hci import ( HCI_Constant, HCI_Error, HCI_StatusError, + HCI_IsoDataPacket, ) from bumble.sdp import ( SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID, @@ -83,11 +92,21 @@ SPEED_TX_UUID = 'E789C754-41A1-45F4-A948-A0A1A90DBA53' SPEED_RX_UUID = '016A2CC7-E14B-4819-935F-1F56EAE4098D' DEFAULT_RFCOMM_UUID = 'E6D55659-C8B4-4B85-96BB-B1143AF6D3AE' + DEFAULT_L2CAP_PSM = 128 DEFAULT_L2CAP_MAX_CREDITS = 128 DEFAULT_L2CAP_MTU = 1024 DEFAULT_L2CAP_MPS = 1024 +DEFAULT_ISO_MAX_SDU_C_TO_P = 251 +DEFAULT_ISO_MAX_SDU_P_TO_C = 251 +DEFAULT_ISO_SDU_INTERVAL_C_TO_P = 10000 +DEFAULT_ISO_SDU_INTERVAL_P_TO_C = 10000 +DEFAULT_ISO_MAX_TRANSPORT_LATENCY_C_TO_P = 35 +DEFAULT_ISO_MAX_TRANSPORT_LATENCY_P_TO_C = 35 +DEFAULT_ISO_RTN_C_TO_P = 3 +DEFAULT_ISO_RTN_P_TO_C = 3 + DEFAULT_LINGER_TIME = 1.0 DEFAULT_POST_CONNECTION_WAIT_TIME = 1.0 @@ -104,14 +123,14 @@ def le_phy_name(phy_id): ) -def print_connection_phy(phy): +def print_connection_phy(phy: ConnectionPHY) -> None: logging.info( color('@@@ PHY: ', 'yellow') + f'TX:{le_phy_name(phy.tx_phy)}/' f'RX:{le_phy_name(phy.rx_phy)}' ) -def print_connection(connection): +def print_connection(connection: Connection) -> None: params = [] if connection.transport == PhysicalTransport.LE: params.append( @@ -136,6 +155,34 @@ def print_connection(connection): logging.info(color('@@@ Connection: ', 'yellow') + ' '.join(params)) +def print_cis_link(cis_link: CisLink) -> None: + logging.info(color("@@@ CIS established", "green")) + logging.info(color('@@@ ISO interval: ', 'green') + f"{cis_link.iso_interval}ms") + logging.info(color('@@@ NSE: ', 'green') + f"{cis_link.nse}") + logging.info(color('@@@ Central->Peripheral:', 'green')) + if cis_link.phy_c_to_p is not None: + logging.info( + color('@@@ PHY: ', 'green') + f"{cis_link.phy_c_to_p.name}" + ) + logging.info( + color('@@@ Latency: ', 'green') + f"{cis_link.transport_latency_c_to_p}µs" + ) + logging.info(color('@@@ BN: ', 'green') + f"{cis_link.bn_c_to_p}") + logging.info(color('@@@ FT: ', 'green') + f"{cis_link.ft_c_to_p}") + logging.info(color('@@@ Max PDU: ', 'green') + f"{cis_link.max_pdu_c_to_p}") + logging.info(color('@@@ Peripheral->Central:', 'green')) + if cis_link.phy_p_to_c is not None: + logging.info( + color('@@@ PHY: ', 'green') + f"{cis_link.phy_p_to_c.name}" + ) + logging.info( + color('@@@ Latency: ', 'green') + f"{cis_link.transport_latency_p_to_c}µs" + ) + logging.info(color('@@@ BN: ', 'green') + f"{cis_link.bn_p_to_c}") + logging.info(color('@@@ FT: ', 'green') + f"{cis_link.ft_p_to_c}") + logging.info(color('@@@ Max PDU: ', 'green') + f"{cis_link.max_pdu_p_to_c}") + + def make_sdp_records(channel): return { 0x00010001: [ @@ -461,7 +508,8 @@ class Sender: self.bytes_sent += len(packet) await self.packet_io.send_packet(packet) - await self.done.wait() + if self.packet_io.can_receive(): + await self.done.wait() run_counter = f'[{run + 1} of {self.repeat + 1}]' if self.repeat else '' logging.info(color(f'=== {run_counter} Done!', 'magenta')) @@ -491,6 +539,9 @@ class Sender: ) self.done.set() + def is_sender(self): + return True + # ----------------------------------------------------------------------------- # Receiver @@ -538,7 +589,8 @@ class Receiver: logging.info( color( f'!!! Unexpected packet, expected {self.expected_packet_index} ' - f'but received {packet.sequence}' + f'but received {packet.sequence}', + 'red', ) ) @@ -581,6 +633,9 @@ class Receiver: await self.done.wait() logging.info(color('=== Done!', 'magenta')) + def is_sender(self): + return False + # ----------------------------------------------------------------------------- # Ping @@ -716,7 +771,8 @@ class Ping: color( f'!!! Unexpected packet, ' f'expected {self.next_expected_packet_index} ' - f'but received {packet.sequence}' + f'but received {packet.sequence}', + 'red', ) ) @@ -724,6 +780,9 @@ class Ping: self.done.set() return + def is_sender(self): + return True + # ----------------------------------------------------------------------------- # Pong @@ -768,7 +827,8 @@ class Pong: logging.info( color( f'!!! Unexpected packet, expected {self.expected_packet_index} ' - f'but received {packet.sequence}' + f'but received {packet.sequence}', + 'red', ) ) @@ -790,6 +850,9 @@ class Pong: await self.done.wait() logging.info(color('=== Done!', 'magenta')) + def is_sender(self): + return False + # ----------------------------------------------------------------------------- # GattClient @@ -953,6 +1016,9 @@ class StreamedPacketIO: # pylint: disable-next=not-callable self.io_sink(struct.pack('>H', len(packet)) + packet) + def can_receive(self): + return True + # ----------------------------------------------------------------------------- # L2capClient @@ -1224,6 +1290,96 @@ class RfcommServer(StreamedPacketIO): await self.dlc.drain() +# ----------------------------------------------------------------------------- +# IsoClient +# ----------------------------------------------------------------------------- +class IsoClient(StreamedPacketIO): + def __init__( + self, + device: Device, + ) -> None: + super().__init__() + self.device = device + self.ready = asyncio.Event() + self.cis_link: Optional[CisLink] = None + + async def on_connection( + self, connection: Connection, cis_link: CisLink, sender: bool + ) -> None: + connection.on(connection.EVENT_DISCONNECTION, self.on_disconnection) + self.cis_link = cis_link + self.io_sink = cis_link.write + await cis_link.setup_data_path( + cis_link.Direction.HOST_TO_CONTROLLER + if sender + else cis_link.Direction.CONTROLLER_TO_HOST + ) + cis_link.sink = self.on_iso_packet + self.ready.set() + + def on_iso_packet(self, iso_packet: HCI_IsoDataPacket) -> None: + self.on_packet(iso_packet.iso_sdu_fragment) + + def on_disconnection(self, _): + pass + + async def drain(self): + if self.cis_link is None: + return + await self.cis_link.drain() + + def can_receive(self): + return False + + +# ----------------------------------------------------------------------------- +# IsoServer +# ----------------------------------------------------------------------------- +class IsoServer(StreamedPacketIO): + def __init__( + self, + device: Device, + ): + super().__init__() + self.device = device + self.cis_link: Optional[CisLink] = None + self.ready = asyncio.Event() + + logging.info( + color( + '### Listening for ISO connection', + 'yellow', + ) + ) + + async def on_connection( + self, connection: Connection, cis_link: CisLink, sender: bool + ) -> None: + connection.on(connection.EVENT_DISCONNECTION, self.on_disconnection) + self.io_sink = cis_link.write + await cis_link.setup_data_path( + cis_link.Direction.HOST_TO_CONTROLLER + if sender + else cis_link.Direction.CONTROLLER_TO_HOST + ) + cis_link.sink = self.on_iso_packet + self.ready.set() + + def on_iso_packet(self, iso_packet: HCI_IsoDataPacket) -> None: + self.on_packet(iso_packet.iso_sdu_fragment) + + def on_disconnection(self, _): + pass + + async def drain(self): + if self.cis_link is None: + return + await self.cis_link.drain() + + def can_receive(self): + return False + + # ----------------------------------------------------------------------------- # Central # ----------------------------------------------------------------------------- @@ -1232,13 +1388,22 @@ class Central(Connection.Listener): self, transport, peripheral_address, - classic, scenario_factory, mode_factory, connection_interval, phy, authenticate, encrypt, + iso, + iso_sdu_interval_c_to_p, + iso_sdu_interval_p_to_c, + iso_max_sdu_c_to_p, + iso_max_sdu_p_to_c, + iso_max_transport_latency_c_to_p, + iso_max_transport_latency_p_to_c, + iso_rtn_c_to_p, + iso_rtn_p_to_c, + classic, extended_data_length, role_switch, le_scan, @@ -1250,6 +1415,15 @@ class Central(Connection.Listener): self.transport = transport self.peripheral_address = peripheral_address self.classic = classic + self.iso = iso + self.iso_sdu_interval_c_to_p = iso_sdu_interval_c_to_p + self.iso_sdu_interval_p_to_c = iso_sdu_interval_p_to_c + self.iso_max_sdu_c_to_p = iso_max_sdu_c_to_p + self.iso_max_sdu_p_to_c = iso_max_sdu_p_to_c + self.iso_max_transport_latency_c_to_p = iso_max_transport_latency_c_to_p + self.iso_max_transport_latency_p_to_c = iso_max_transport_latency_p_to_c + self.iso_rtn_c_to_p = iso_rtn_c_to_p + self.iso_rtn_p_to_c = iso_rtn_p_to_c self.scenario_factory = scenario_factory self.mode_factory = mode_factory self.authenticate = authenticate @@ -1308,6 +1482,13 @@ class Central(Connection.Listener): ) mode = self.mode_factory(self.device) scenario = self.scenario_factory(mode) + self.device.classic_enabled = self.classic + self.device.cis_enabled = self.iso + + # Set up a pairing config factory with minimal requirements. + self.device.pairing_config_factory = lambda _: PairingConfig( + sc=False, mitm=False, bonding=False + ) await pre_power_on(self.device, self.classic) await self.device.power_on() @@ -1392,7 +1573,72 @@ class Central(Connection.Listener): ) ) - await mode.on_connection(self.connection) + # Setup ISO streams. + if self.iso: + if scenario.is_sender(): + sdu_interval_c_to_p = ( + self.iso_sdu_interval_c_to_p or DEFAULT_ISO_SDU_INTERVAL_C_TO_P + ) + sdu_interval_p_to_c = self.iso_sdu_interval_p_to_c or 0 + max_transport_latency_c_to_p = ( + self.iso_max_transport_latency_c_to_p + or DEFAULT_ISO_MAX_TRANSPORT_LATENCY_C_TO_P + ) + max_transport_latency_p_to_c = ( + self.iso_max_transport_latency_p_to_c or 0 + ) + max_sdu_c_to_p = ( + self.iso_max_sdu_c_to_p or DEFAULT_ISO_MAX_SDU_C_TO_P + ) + max_sdu_p_to_c = self.iso_max_sdu_p_to_c or 0 + rtn_c_to_p = self.iso_rtn_c_to_p or DEFAULT_ISO_RTN_C_TO_P + rtn_p_to_c = self.iso_rtn_p_to_c or 0 + else: + sdu_interval_p_to_c = ( + self.iso_sdu_interval_p_to_c or DEFAULT_ISO_SDU_INTERVAL_P_TO_C + ) + sdu_interval_c_to_p = self.iso_sdu_interval_c_to_p or 0 + max_transport_latency_p_to_c = ( + self.iso_max_transport_latency_p_to_c + or DEFAULT_ISO_MAX_TRANSPORT_LATENCY_P_TO_C + ) + max_transport_latency_c_to_p = ( + self.iso_max_transport_latency_c_to_p or 0 + ) + max_sdu_p_to_c = ( + self.iso_max_sdu_p_to_c or DEFAULT_ISO_MAX_SDU_P_TO_C + ) + max_sdu_c_to_p = self.iso_max_sdu_c_to_p or 0 + rtn_p_to_c = self.iso_rtn_p_to_c or DEFAULT_ISO_RTN_P_TO_C + rtn_c_to_p = self.iso_rtn_c_to_p or 0 + cis_handles = await self.device.setup_cig( + CigParameters( + cig_id=1, + sdu_interval_c_to_p=sdu_interval_c_to_p, + sdu_interval_p_to_c=sdu_interval_p_to_c, + max_transport_latency_c_to_p=max_transport_latency_c_to_p, + max_transport_latency_p_to_c=max_transport_latency_p_to_c, + cis_parameters=[ + CigParameters.CisParameters( + cis_id=2, + max_sdu_c_to_p=max_sdu_c_to_p, + max_sdu_p_to_c=max_sdu_p_to_c, + rtn_c_to_p=rtn_c_to_p, + rtn_p_to_c=rtn_p_to_c, + ) + ], + ) + ) + cis_link = ( + await self.device.create_cis([(cis_handles[0], self.connection)]) + )[0] + print_cis_link(cis_link) + + await mode.on_connection( + self.connection, cis_link, scenario.is_sender() + ) + else: + await mode.on_connection(self.connection) await scenario.run() await asyncio.sleep(DEFAULT_LINGER_TIME) @@ -1428,6 +1674,7 @@ class Peripheral(Device.Listener, Connection.Listener): scenario_factory, mode_factory, classic, + iso, extended_data_length, role_switch, le_scan, @@ -1437,6 +1684,7 @@ class Peripheral(Device.Listener, Connection.Listener): ): self.transport = transport self.classic = classic + self.iso = iso self.scenario_factory = scenario_factory self.mode_factory = mode_factory self.extended_data_length = extended_data_length @@ -1470,6 +1718,13 @@ class Peripheral(Device.Listener, Connection.Listener): self.device.listener = self self.mode = self.mode_factory(self.device) self.scenario = self.scenario_factory(self.mode) + self.device.classic_enabled = self.classic + self.device.cis_enabled = self.iso + + # Set up a pairing config factory with minimal requirements. + self.device.pairing_config_factory = lambda _: PairingConfig( + sc=False, mitm=False, bonding=False + ) await pre_power_on(self.device, self.classic) await self.device.power_on() @@ -1501,7 +1756,21 @@ class Peripheral(Device.Listener, Connection.Listener): logging.info(color('### Connected', 'cyan')) print_connection(self.connection) - await self.mode.on_connection(self.connection) + if self.iso: + + async def on_cis_request(cis_link: CisLink) -> None: + logging.info(color("@@@ Accepting CIS", "green")) + await self.device.accept_cis_request(cis_link) + print_cis_link(cis_link) + + await self.mode.on_connection( + self.connection, cis_link, self.scenario.is_sender() + ) + + self.connection.on(self.connection.EVENT_CIS_REQUEST, on_cis_request) + else: + await self.mode.on_connection(self.connection) + await self.scenario.run() await asyncio.sleep(DEFAULT_LINGER_TIME) @@ -1613,6 +1882,12 @@ def create_mode_factory(ctx, default_mode): credits_threshold=ctx.obj['rfcomm_credits_threshold'], ) + if mode == 'iso-server': + return IsoServer(device) + + if mode == 'iso-client': + return IsoClient(device) + raise ValueError('invalid mode') return create_mode @@ -1640,6 +1915,9 @@ def create_scenario_factory(ctx, default_scenario): return Receiver(packet_io, ctx.obj['linger']) if scenario == 'ping': + if isinstance(packet_io, (IsoClient, IsoServer)): + raise ValueError('ping not supported with ISO') + return Ping( packet_io, start_delay=ctx.obj['start_delay'], @@ -1651,6 +1929,9 @@ def create_scenario_factory(ctx, default_scenario): ) if scenario == 'pong': + if isinstance(packet_io, (IsoClient, IsoServer)): + raise ValueError('pong not supported with ISO') + return Pong(packet_io, ctx.obj['linger']) raise ValueError('invalid scenario') @@ -1674,6 +1955,8 @@ def create_scenario_factory(ctx, default_scenario): 'l2cap-server', 'rfcomm-client', 'rfcomm-server', + 'iso-client', + 'iso-server', ] ), ) @@ -1896,6 +2179,7 @@ def bench( ctx.obj['classic_page_scan'] = classic_page_scan ctx.obj['classic_inquiry_scan'] = classic_inquiry_scan ctx.obj['classic'] = mode in ('rfcomm-client', 'rfcomm-server') + ctx.obj['iso'] = mode in ('iso-client', 'iso-server') @bench.command() @@ -1917,26 +2201,88 @@ def bench( @click.option('--phy', type=click.Choice(['1m', '2m', 'coded']), help='PHY to use') @click.option('--authenticate', is_flag=True, help='Authenticate (RFComm only)') @click.option('--encrypt', is_flag=True, help='Encrypt the connection (RFComm only)') +@click.option( + '--iso-sdu-interval-c-to-p', + type=int, + help='ISO SDU central -> peripheral (microseconds)', +) +@click.option( + '--iso-sdu-interval-p-to-c', + type=int, + help='ISO SDU interval peripheral -> central (microseconds)', +) +@click.option( + '--iso-max-sdu-c-to-p', + type=int, + help='ISO max SDU central -> peripheral', +) +@click.option( + '--iso-max-sdu-p-to-c', + type=int, + help='ISO max SDU peripheral -> central', +) +@click.option( + '--iso-max-transport-latency-c-to-p', + type=int, + help='ISO max transport latency central -> peripheral (milliseconds)', +) +@click.option( + '--iso-max-transport-latency-p-to-c', + type=int, + help='ISO max transport latency peripheral -> central (milliseconds)', +) +@click.option( + '--iso-rtn-c-to-p', + type=int, + help='ISO RTN central -> peripheral (integer count)', +) +@click.option( + '--iso-rtn-p-to-c', + type=int, + help='ISO RTN peripheral -> central (integer count)', +) @click.pass_context def central( - ctx, transport, peripheral_address, connection_interval, phy, authenticate, encrypt + ctx, + transport, + peripheral_address, + connection_interval, + phy, + authenticate, + encrypt, + iso_sdu_interval_c_to_p, + iso_sdu_interval_p_to_c, + iso_max_sdu_c_to_p, + iso_max_sdu_p_to_c, + iso_max_transport_latency_c_to_p, + iso_max_transport_latency_p_to_c, + iso_rtn_c_to_p, + iso_rtn_p_to_c, ): """Run as a central (initiates the connection)""" scenario_factory = create_scenario_factory(ctx, 'send') mode_factory = create_mode_factory(ctx, 'gatt-client') - classic = ctx.obj['classic'] async def run_central(): await Central( transport, peripheral_address, - classic, scenario_factory, mode_factory, connection_interval, phy, authenticate, encrypt or authenticate, + ctx.obj['iso'], + iso_sdu_interval_c_to_p, + iso_sdu_interval_p_to_c, + iso_max_sdu_c_to_p, + iso_max_sdu_p_to_c, + iso_max_transport_latency_c_to_p, + iso_max_transport_latency_p_to_c, + iso_rtn_c_to_p, + iso_rtn_p_to_c, + ctx.obj['classic'], ctx.obj['extended_data_length'], ctx.obj['role_switch'], ctx.obj['le_scan'], @@ -1962,6 +2308,7 @@ def peripheral(ctx, transport): scenario_factory, mode_factory, ctx.obj['classic'], + ctx.obj['iso'], ctx.obj['extended_data_length'], ctx.obj['role_switch'], ctx.obj['le_scan'], diff --git a/apps/controller_info.py b/apps/controller_info.py index 2c8d9aa..67f40ae 100644 --- a/apps/controller_info.py +++ b/apps/controller_info.py @@ -242,7 +242,9 @@ async def get_codecs_info(host: Host) -> None: # ----------------------------------------------------------------------------- -async def async_main(latency_probes, transport): +async def async_main( + latency_probes, latency_probe_interval, latency_probe_command, transport +): print('<<< connecting to HCI...') async with await open_transport_or_link(transport) as (hci_source, hci_sink): print('<<< connected') @@ -251,19 +253,32 @@ async def async_main(latency_probes, transport): await host.reset() # Measure the latency if requested + # (we add an extra probe at the start, that we ignore, just to ensure that + # the transport is primed) latencies = [] if latency_probes: - for _ in range(latency_probes): + if latency_probe_command: + probe_hci_command = HCI_Command.from_bytes( + bytes.fromhex(latency_probe_command) + ) + else: + probe_hci_command = HCI_Read_Local_Version_Information_Command() + + for iteration in range(1 + latency_probes): + if latency_probe_interval: + await asyncio.sleep(latency_probe_interval / 1000) start = time.time() - await host.send_command(HCI_Read_Local_Version_Information_Command()) - latencies.append(1000 * (time.time() - start)) + await host.send_command(probe_hci_command) + if iteration: + latencies.append(1000 * (time.time() - start)) print( color('HCI Command Latency:', 'yellow'), ( f'min={min(latencies):.2f}, ' f'max={max(latencies):.2f}, ' - f'average={sum(latencies)/len(latencies):.2f}' + f'average={sum(latencies)/len(latencies):.2f},' ), + [f'{latency:.4}' for latency in latencies], '\n', ) @@ -311,10 +326,32 @@ async def async_main(latency_probes, transport): type=int, help='Send N commands to measure HCI transport latency statistics', ) +@click.option( + '--latency-probe-interval', + metavar='INTERVAL', + type=int, + help='Interval between latency probes (milliseconds)', +) +@click.option( + '--latency-probe-command', + metavar='COMMAND_HEX', + help=( + 'Probe command (HCI Command packet bytes, in hex. Use 0177FC00 for' + ' a loopback test with the HCI remote proxy app)' + ), +) @click.argument('transport') -def main(latency_probes, transport): - logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper()) - asyncio.run(async_main(latency_probes, transport)) +def main(latency_probes, latency_probe_interval, latency_probe_command, transport): + logging.basicConfig( + level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper(), + format="[%(asctime)s.%(msecs)03d] %(levelname)s:%(name)s:%(message)s", + datefmt="%H:%M:%S", + ) + asyncio.run( + async_main( + latency_probes, latency_probe_interval, latency_probe_command, transport + ) + ) # ----------------------------------------------------------------------------- diff --git a/apps/controller_loopback.py b/apps/controller_loopback.py index 2d16bb9..434704f 100644 --- a/apps/controller_loopback.py +++ b/apps/controller_loopback.py @@ -194,7 +194,11 @@ class Loopback: ) @click.argument('transport') def main(packet_size, packet_count, transport): - logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper()) + logging.basicConfig( + level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper(), + format="[%(asctime)s.%(msecs)03d] %(levelname)s:%(name)s:%(message)s", + datefmt="%H:%M:%S", + ) loopback = Loopback(packet_size, packet_count, transport) asyncio.run(loopback.run()) diff --git a/apps/lea_unicast/app.py b/apps/lea_unicast/app.py index 3c74cdb..79975de 100644 --- a/apps/lea_unicast/app.py +++ b/apps/lea_unicast/app.py @@ -337,7 +337,12 @@ class Speaker: ), ( AdvertisingData.FLAGS, - bytes([AdvertisingData.LE_GENERAL_DISCOVERABLE_MODE_FLAG]), + bytes( + [ + AdvertisingData.LE_GENERAL_DISCOVERABLE_MODE_FLAG + | AdvertisingData.BR_EDR_NOT_SUPPORTED_FLAG + ] + ), ), ( AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS, diff --git a/bumble/controller.py b/bumble/controller.py index c9bdae7..e0f3319 100644 --- a/bumble/controller.py +++ b/bumble/controller.py @@ -618,8 +618,8 @@ class Controller: cis_sync_delay=0, transport_latency_c_to_p=0, transport_latency_p_to_c=0, - phy_c_to_p=0, - phy_p_to_c=0, + phy_c_to_p=1, + phy_p_to_c=1, nse=0, bn_c_to_p=0, bn_p_to_c=0, diff --git a/bumble/device.py b/bumble/device.py index a92af56..9c11b1e 100644 --- a/bumble/device.py +++ b/bumble/device.py @@ -139,6 +139,9 @@ DEVICE_DEFAULT_ADVERTISING_TX_POWER = ( DEVICE_DEFAULT_PERIODIC_ADVERTISING_SYNC_SKIP = 0 DEVICE_DEFAULT_PERIODIC_ADVERTISING_SYNC_TIMEOUT = 5.0 DEVICE_DEFAULT_LE_RPA_TIMEOUT = 15 * 60 # 15 minutes (in seconds) +DEVICE_DEFAULT_ISO_CIS_MAX_SDU = 251 +DEVICE_DEFAULT_ISO_CIS_RTN = 10 +DEVICE_DEFAULT_ISO_CIS_MAX_TRANSPORT_LATENCY = 100 # fmt: on # pylint: enable=line-too-long @@ -489,7 +492,18 @@ class PeriodicAdvertisement: # ----------------------------------------------------------------------------- @dataclass -class BIGInfoAdvertisement: +class BigInfoAdvertisement: + class Framing(utils.OpenIntEnum): + # fmt: off + UNFRAMED = 0X00 + FRAMED_SEGMENTABLE_MODE = 0X01 + FRAMED_UNSEGMENTED_MODE = 0X02 + + class Encryption(utils.OpenIntEnum): + # fmt: off + UNENCRYPTED = 0x00 + ENCRYPTED = 0x01 + address: hci.Address sid: int num_bis: int @@ -502,8 +516,8 @@ class BIGInfoAdvertisement: sdu_interval: int max_sdu: int phy: hci.Phy - framed: bool - encrypted: bool + framing: Framing + encryption: Encryption @classmethod def from_report(cls, address: hci.Address, sid: int, report) -> Self: @@ -520,8 +534,8 @@ class BIGInfoAdvertisement: report.sdu_interval, report.max_sdu, hci.Phy(report.phy), - report.framing != 0, - report.encryption != 0, + cls.Framing(report.framing), + cls.Encryption(report.encryption), ) @@ -1013,7 +1027,7 @@ class PeriodicAdvertisingSync(utils.EventEmitter): def on_biginfo_advertising_report(self, report) -> None: self.emit( self.EVENT_BIGINFO_ADVERTISEMENT, - BIGInfoAdvertisement.from_report(self.advertiser_address, self.sid, report), + BigInfoAdvertisement.from_report(self.advertiser_address, self.sid, report), ) def __str__(self) -> str: @@ -1031,14 +1045,24 @@ class PeriodicAdvertisingSync(utils.EventEmitter): # ----------------------------------------------------------------------------- @dataclass class BigParameters: + class Packing(utils.OpenIntEnum): + # fmt: off + SEQUENTIAL = 0x00 + INTERLEAVED = 0x01 + + class Framing(utils.OpenIntEnum): + # fmt: off + UNFRAMED = 0x00 + FRAMED = 0x01 + num_bis: int - sdu_interval: int + sdu_interval: int # SDU interval, in microseconds max_sdu: int - max_transport_latency: int + max_transport_latency: int # Max transport latency, in milliseconds rtn: int phy: hci.PhyBit = hci.PhyBit.LE_2M - packing: int = 0 - framing: int = 0 + packing: Packing = Packing.SEQUENTIAL + framing: Framing = Framing.UNFRAMED broadcast_code: bytes | None = None @@ -1061,15 +1085,15 @@ class Big(utils.EventEmitter): state: State = State.PENDING # Attributes provided by BIG Create Complete event - big_sync_delay: int = 0 - transport_latency_big: int = 0 - phy: int = 0 + big_sync_delay: int = 0 # Sync delay, in microseconds + transport_latency_big: int = 0 # Transport latency, in microseconds + phy: hci.Phy = hci.Phy.LE_1M nse: int = 0 bn: int = 0 pto: int = 0 irc: int = 0 max_pdu: int = 0 - iso_interval: float = 0.0 + iso_interval: float = 0.0 # ISO interval, in milliseconds bis_links: Sequence[BisLink] = () def __post_init__(self) -> None: @@ -1499,10 +1523,74 @@ class _IsoLink: """Write an ISO SDU.""" self.device.host.send_iso_sdu(connection_handle=self.handle, sdu=sdu) + async def get_tx_time_stamp(self) -> tuple[int, int, int]: + response = await self.device.host.send_command( + hci.HCI_LE_Read_ISO_TX_Sync_Command(connection_handle=self.handle), + check_result=True, + ) + return ( + response.return_parameters.packet_sequence_number, + response.return_parameters.tx_time_stamp, + response.return_parameters.time_offset, + ) + @property def data_packet_queue(self) -> DataPacketQueue | None: return self.device.host.get_data_packet_queue(self.handle) + async def drain(self) -> None: + if data_packet_queue := self.data_packet_queue: + await data_packet_queue.drain(self.handle) + + +# ----------------------------------------------------------------------------- +@dataclass +class CigParameters: + class WorstCaseSca(utils.OpenIntEnum): + # fmt: off + SCA_251_TO_500_PPM = 0x00 + SCA_151_TO_250_PPM = 0x01 + SCA_101_TO_150_PPM = 0x02 + SCA_76_TO_100_PPM = 0x03 + SCA_51_TO_75_PPM = 0x04 + SCA_31_TO_50_PPM = 0x05 + SCA_21_TO_30_PPM = 0x06 + SCA_0_TO_20_PPM = 0x07 + + class Packing(utils.OpenIntEnum): + # fmt: off + SEQUENTIAL = 0x00 + INTERLEAVED = 0x01 + + class Framing(utils.OpenIntEnum): + # fmt: off + UNFRAMED = 0x00 + FRAMED = 0x01 + + @dataclass + class CisParameters: + cis_id: int + max_sdu_c_to_p: int = DEVICE_DEFAULT_ISO_CIS_MAX_SDU + max_sdu_p_to_c: int = DEVICE_DEFAULT_ISO_CIS_MAX_SDU + phy_c_to_p: hci.PhyBit = hci.PhyBit.LE_2M + phy_p_to_c: hci.PhyBit = hci.PhyBit.LE_2M + rtn_c_to_p: int = DEVICE_DEFAULT_ISO_CIS_RTN # Number of C->P retransmissions + rtn_p_to_c: int = DEVICE_DEFAULT_ISO_CIS_RTN # Number of P->C retransmissions + + cig_id: int + cis_parameters: list[CisParameters] + sdu_interval_c_to_p: int # C->P SDU interval, in microseconds + sdu_interval_p_to_c: int # P->C SDU interval, in microseconds + worst_case_sca: WorstCaseSca = WorstCaseSca.SCA_251_TO_500_PPM + packing: Packing = Packing.SEQUENTIAL + framing: Framing = Framing.UNFRAMED + max_transport_latency_c_to_p: int = ( + DEVICE_DEFAULT_ISO_CIS_MAX_TRANSPORT_LATENCY # Max C->P transport latency, in milliseconds + ) + max_transport_latency_p_to_c: int = ( + DEVICE_DEFAULT_ISO_CIS_MAX_TRANSPORT_LATENCY # Max C->P transport latency, in milliseconds + ) + # ----------------------------------------------------------------------------- @dataclass @@ -1516,6 +1604,20 @@ class CisLink(utils.EventEmitter, _IsoLink): handle: int # CIS handle assigned by Controller (in LE_Set_CIG_Parameters Complete or LE_CIS_Request events) cis_id: int # CIS ID assigned by Central device cig_id: int # CIG ID assigned by Central device + cig_sync_delay: int = 0 # CIG sync delay, in microseconds + cis_sync_delay: int = 0 # CIS sync delay, in microseconds + transport_latency_c_to_p: int = 0 # C->P transport latency, in microseconds + transport_latency_p_to_c: int = 0 # P->C transport latency, in microseconds + phy_c_to_p: Optional[hci.Phy] = None + phy_p_to_c: Optional[hci.Phy] = None + nse: int = 0 + bn_c_to_p: int = 0 + bn_p_to_c: int = 0 + ft_c_to_p: int = 0 + ft_p_to_c: int = 0 + max_pdu_c_to_p: int = 0 + max_pdu_p_to_c: int = 0 + iso_interval: float = 0.0 # ISO interval, in milliseconds state: State = State.PENDING sink: Callable[[hci.HCI_IsoDataPacket], Any] | None = None @@ -1598,6 +1700,7 @@ class Connection(utils.CompositeEventEmitter): peer_resolvable_address: Optional[hci.Address] peer_le_features: Optional[hci.LeFeatureMask] role: hci.Role + parameters: Parameters encryption: int encryption_key_size: int authenticated: bool @@ -1642,6 +1745,9 @@ class Connection(utils.CompositeEventEmitter): EVENT_PAIRING_FAILURE = "pairing_failure" EVENT_SECURITY_REQUEST = "security_request" EVENT_LINK_KEY = "link_key" + EVENT_CIS_REQUEST = "cis_request" + EVENT_CIS_ESTABLISHMENT = "cis_establishment" + EVENT_CIS_ESTABLISHMENT_FAILURE = "cis_establishment_failure" @utils.composite_listener class Listener: @@ -4569,48 +4675,39 @@ class Device(utils.CompositeEventEmitter): @utils.experimental('Only for testing.') async def setup_cig( self, - cig_id: int, - cis_id: Sequence[int], - sdu_interval: tuple[int, int], - framing: int, - max_sdu: tuple[int, int], - retransmission_number: int, - max_transport_latency: tuple[int, int], + parameters: CigParameters, ) -> list[int]: """Sends hci.HCI_LE_Set_CIG_Parameters_Command. Args: - cig_id: CIG_ID. - cis_id: CID ID list. - sdu_interval: SDU intervals of (Central->Peripheral, Peripheral->Cental). - framing: Un-framing(0) or Framing(1). - max_sdu: Max SDU counts of (Central->Peripheral, Peripheral->Cental). - retransmission_number: retransmission_number. - max_transport_latency: Max transport latencies of - (Central->Peripheral, Peripheral->Cental). + parameters: CIG parameters. Returns: List of created CIS handles corresponding to the same order of [cid_id]. """ - num_cis = len(cis_id) + num_cis = len(parameters.cis_parameters) response = await self.send_command( hci.HCI_LE_Set_CIG_Parameters_Command( - cig_id=cig_id, - sdu_interval_c_to_p=sdu_interval[0], - sdu_interval_p_to_c=sdu_interval[1], - worst_case_sca=0x00, # 251-500 ppm - packing=0x00, # Sequential - framing=framing, - max_transport_latency_c_to_p=max_transport_latency[0], - max_transport_latency_p_to_c=max_transport_latency[1], - cis_id=cis_id, - max_sdu_c_to_p=[max_sdu[0]] * num_cis, - max_sdu_p_to_c=[max_sdu[1]] * num_cis, - phy_c_to_p=[hci.HCI_LE_2M_PHY] * num_cis, - phy_p_to_c=[hci.HCI_LE_2M_PHY] * num_cis, - rtn_c_to_p=[retransmission_number] * num_cis, - rtn_p_to_c=[retransmission_number] * num_cis, + cig_id=parameters.cig_id, + sdu_interval_c_to_p=parameters.sdu_interval_c_to_p, + sdu_interval_p_to_c=parameters.sdu_interval_p_to_c, + worst_case_sca=parameters.worst_case_sca, + packing=int(parameters.packing), + framing=int(parameters.framing), + max_transport_latency_c_to_p=parameters.max_transport_latency_c_to_p, + max_transport_latency_p_to_c=parameters.max_transport_latency_p_to_c, + cis_id=[cis.cis_id for cis in parameters.cis_parameters], + max_sdu_c_to_p=[ + cis.max_sdu_c_to_p for cis in parameters.cis_parameters + ], + max_sdu_p_to_c=[ + cis.max_sdu_p_to_c for cis in parameters.cis_parameters + ], + phy_c_to_p=[cis.phy_c_to_p for cis in parameters.cis_parameters], + phy_p_to_c=[cis.phy_p_to_c for cis in parameters.cis_parameters], + rtn_c_to_p=[cis.rtn_c_to_p for cis in parameters.cis_parameters], + rtn_p_to_c=[cis.rtn_p_to_c for cis in parameters.cis_parameters], ), check_result=True, ) @@ -4618,19 +4715,17 @@ class Device(utils.CompositeEventEmitter): # Ideally, we should manage CIG lifecycle, but they are not useful for Unicast # Server, so here it only provides a basic functionality for testing. cis_handles = response.return_parameters.connection_handle[:] - for id, cis_handle in zip(cis_id, cis_handles): - self._pending_cis[cis_handle] = (id, cig_id) + for cis, cis_handle in zip(parameters.cis_parameters, cis_handles): + self._pending_cis[cis_handle] = (cis.cis_id, parameters.cig_id) return cis_handles # [LE only] @utils.experimental('Only for testing.') async def create_cis( - self, cis_acl_pairs: Sequence[tuple[int, int]] + self, cis_acl_pairs: Sequence[tuple[int, Connection]] ) -> list[CisLink]: - for cis_handle, acl_handle in cis_acl_pairs: - acl_connection = self.lookup_connection(acl_handle) - assert acl_connection + for cis_handle, acl_connection in cis_acl_pairs: cis_id, cig_id = self._pending_cis.pop(cis_handle) self.cis_links[cis_handle] = CisLink( device=self, @@ -4650,8 +4745,8 @@ class Device(utils.CompositeEventEmitter): if pending_future := pending_cis_establishments.get(cis_link.handle): pending_future.set_result(cis_link) - def on_cis_establishment_failure(cis_handle: int, status: int) -> None: - if pending_future := pending_cis_establishments.get(cis_handle): + def on_cis_establishment_failure(cis_link: CisLink, status: int) -> None: + if pending_future := pending_cis_establishments.get(cis_link.handle): pending_future.set_exception(hci.HCI_Error(status)) watcher.on(self, self.EVENT_CIS_ESTABLISHMENT, on_cis_establishment) @@ -4661,7 +4756,7 @@ class Device(utils.CompositeEventEmitter): await self.send_command( hci.HCI_LE_Create_CIS_Command( cis_connection_handle=[p[0] for p in cis_acl_pairs], - acl_connection_handle=[p[1] for p in cis_acl_pairs], + acl_connection_handle=[p[1].handle for p in cis_acl_pairs], ), check_result=True, ) @@ -4670,26 +4765,21 @@ class Device(utils.CompositeEventEmitter): # [LE only] @utils.experimental('Only for testing.') - async def accept_cis_request(self, handle: int) -> CisLink: + async def accept_cis_request(self, cis_link: CisLink) -> None: """[LE Only] Accepts an incoming CIS request. - When the specified CIS handle is already created, this method returns the - existed CIS link object immediately. + This method returns when the CIS is established, or raises an exception if + the CIS establishment fails. Args: handle: CIS handle to accept. - - Returns: - CIS link object on the given handle. """ - if not (cis_link := self.cis_links.get(handle)): - raise InvalidStateError(f'No pending CIS request of handle {handle}') # There might be multiple ASE sharing a CIS channel. # If one of them has accepted the request, the others should just leverage it. async with self._cis_lock: if cis_link.state == CisLink.State.ESTABLISHED: - return cis_link + return with closing(utils.EventWatcher()) as watcher: pending_establishment = asyncio.get_running_loop().create_future() @@ -4708,26 +4798,24 @@ class Device(utils.CompositeEventEmitter): ) await self.send_command( - hci.HCI_LE_Accept_CIS_Request_Command(connection_handle=handle), + hci.HCI_LE_Accept_CIS_Request_Command( + connection_handle=cis_link.handle + ), check_result=True, ) await pending_establishment - return cis_link - - # Mypy believes this is reachable when context is an ExitStack. - raise UnreachableError() # [LE only] @utils.experimental('Only for testing.') async def reject_cis_request( self, - handle: int, + cis_link: CisLink, reason: int = hci.HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR, ) -> None: await self.send_command( hci.HCI_LE_Reject_CIS_Request_Command( - connection_handle=handle, reason=reason + connection_handle=cis_link.handle, reason=reason ), check_result=True, ) @@ -5265,7 +5353,7 @@ class Device(utils.CompositeEventEmitter): big.bis_links = [BisLink(handle=handle, big=big) for handle in bis_handles] big.big_sync_delay = big_sync_delay big.transport_latency_big = transport_latency_big - big.phy = phy + big.phy = hci.Phy(phy) big.nse = nse big.bn = bn big.pto = pto @@ -5929,24 +6017,63 @@ class Device(utils.CompositeEventEmitter): f'cis_id=[0x{cis_id:02X}] ***' ) # LE_CIS_Established event doesn't provide info, so we must store them here. - self.cis_links[cis_handle] = CisLink( + cis_link = CisLink( device=self, acl_connection=acl_connection, handle=cis_handle, cig_id=cig_id, cis_id=cis_id, ) - self.emit(self.EVENT_CIS_REQUEST, acl_connection, cis_handle, cig_id, cis_id) + self.cis_links[cis_handle] = cis_link + acl_connection.emit(acl_connection.EVENT_CIS_REQUEST, cis_link) + self.emit(self.EVENT_CIS_REQUEST, cis_link) # [LE only] @host_event_handler @utils.experimental('Only for testing') - def on_cis_establishment(self, cis_handle: int) -> None: + def on_cis_establishment( + self, + cis_handle: int, + cig_sync_delay: int, + cis_sync_delay: int, + transport_latency_c_to_p: int, + transport_latency_p_to_c: int, + phy_c_to_p: int, + phy_p_to_c: int, + nse: int, + bn_c_to_p: int, + bn_p_to_c: int, + ft_c_to_p: int, + ft_p_to_c: int, + max_pdu_c_to_p: int, + max_pdu_p_to_c: int, + iso_interval: int, + ) -> None: + if cis_handle not in self.cis_links: + logger.warning("CIS link not found") + return + cis_link = self.cis_links[cis_handle] cis_link.state = CisLink.State.ESTABLISHED assert cis_link.acl_connection + # Update the CIS + cis_link.cig_sync_delay = cig_sync_delay + cis_link.cis_sync_delay = cis_sync_delay + cis_link.transport_latency_c_to_p = transport_latency_c_to_p + cis_link.transport_latency_p_to_c = transport_latency_p_to_c + cis_link.phy_c_to_p = hci.Phy(phy_c_to_p) + cis_link.phy_p_to_c = hci.Phy(phy_p_to_c) + cis_link.nse = nse + cis_link.bn_c_to_p = bn_c_to_p + cis_link.bn_p_to_c = bn_p_to_c + cis_link.ft_c_to_p = ft_c_to_p + cis_link.ft_p_to_c = ft_p_to_c + cis_link.max_pdu_c_to_p = max_pdu_c_to_p + cis_link.max_pdu_p_to_c = max_pdu_p_to_c + cis_link.iso_interval = iso_interval * 1.25 + logger.debug( f'*** CIS Establishment ' f'{cis_link.acl_connection.peer_address}, ' @@ -5956,16 +6083,27 @@ class Device(utils.CompositeEventEmitter): ) cis_link.emit(cis_link.EVENT_ESTABLISHMENT) + cis_link.acl_connection.emit( + cis_link.acl_connection.EVENT_CIS_ESTABLISHMENT, cis_link + ) self.emit(self.EVENT_CIS_ESTABLISHMENT, cis_link) # [LE only] @host_event_handler @utils.experimental('Only for testing') def on_cis_establishment_failure(self, cis_handle: int, status: int) -> None: + if (cis_link := self.cis_links.pop(cis_handle, None)) is None: + logger.warning("CIS link not found") + return + logger.debug(f'*** CIS Establishment Failure: cis=[0x{cis_handle:04X}] ***') - if cis_link := self.cis_links.pop(cis_handle): - cis_link.emit(cis_link.EVENT_ESTABLISHMENT_FAILURE, status) - self.emit(self.EVENT_CIS_ESTABLISHMENT_FAILURE, cis_handle, status) + cis_link.emit(cis_link.EVENT_ESTABLISHMENT_FAILURE, status) + cis_link.acl_connection.emit( + cis_link.acl_connection.EVENT_CIS_ESTABLISHMENT_FAILURE, + cis_link, + status, + ) + self.emit(self.EVENT_CIS_ESTABLISHMENT_FAILURE, cis_link, status) # [LE only] @host_event_handler @@ -6026,13 +6164,19 @@ class Device(utils.CompositeEventEmitter): @host_event_handler @with_connection_from_handle - def on_connection_parameters_update(self, connection, connection_parameters): + def on_connection_parameters_update( + self, connection: Connection, connection_parameters: core.ConnectionParameters + ): logger.debug( f'*** Connection Parameters Update: [0x{connection.handle:04X}] ' f'{connection.peer_address} as {connection.role_name}, ' f'{connection_parameters}' ) - connection.parameters = connection_parameters + connection.parameters = Connection.Parameters( + connection_parameters.connection_interval * 1.25, + connection_parameters.peripheral_latency, + connection_parameters.supervision_timeout * 10.0, + ) connection.emit(connection.EVENT_CONNECTION_PARAMETERS_UPDATE) @host_event_handler diff --git a/bumble/hci.py b/bumble/hci.py index de2d6c3..55c6448 100644 --- a/bumble/hci.py +++ b/bumble/hci.py @@ -5089,6 +5089,25 @@ class HCI_LE_Set_Default_Periodic_Advertising_Sync_Transfer_Parameters_Command( ] +# ----------------------------------------------------------------------------- +@HCI_Command.command +@dataclasses.dataclass +class HCI_LE_Read_ISO_TX_Sync_Command(HCI_Command): + ''' + See Bluetooth spec @ 7.8.96 LE Read ISO TX Sync command + ''' + + connection_handle: int = field(metadata=metadata(2)) + + return_parameters_fields = [ + ('status', STATUS_SPEC), + ('connection_handle', 2), + ('packet_sequence_number', 2), + ('tx_time_stamp', 4), + ('time_offset', 3), + ] + + # ----------------------------------------------------------------------------- @HCI_Command.command @dataclasses.dataclass @@ -7381,6 +7400,9 @@ class HCI_IsoDataPacket(HCI_Packet): iso_sdu_length: Optional[int] = None packet_status_flag: Optional[int] = None + def __post_init__(self) -> None: + self.ts_flag = self.time_stamp is not None + @staticmethod def from_bytes(packet: bytes) -> HCI_IsoDataPacket: time_stamp: Optional[int] = None @@ -7446,15 +7468,26 @@ class HCI_IsoDataPacket(HCI_Packet): return struct.pack(fmt, *args) + self.iso_sdu_fragment def __str__(self) -> str: - return ( + result = ( f'{color("ISO", "blue")}: ' f'handle=0x{self.connection_handle:04x}, ' f'pb={self.pb_flag}, ' - f'ps={self.packet_status_flag}, ' - f'data_total_length={self.data_total_length}, ' - f'sdu_fragment={self.iso_sdu_fragment.hex()}' + f'data_total_length={self.data_total_length}' ) + if self.ts_flag: + result += f', time_stamp={self.time_stamp}' + + if self.pb_flag in (0b00, 0b10): + result += ( + ', ' + f'packet_sequence_number={self.packet_sequence_number}, ' + f'ps={self.packet_status_flag}, ' + f'sdu_fragment={self.iso_sdu_fragment.hex()}' + ) + + return result + # ----------------------------------------------------------------------------- class HCI_AclDataPacketAssembler: diff --git a/bumble/host.py b/bumble/host.py index 400816a..817485a 100644 --- a/bumble/host.py +++ b/bumble/host.py @@ -38,7 +38,6 @@ from bumble.snoop import Snooper from bumble import drivers from bumble import hci from bumble.core import ( - PhysicalTransport, PhysicalTransport, ConnectionPHY, ConnectionParameters, @@ -72,6 +71,11 @@ class DataPacketQueue(utils.EventEmitter): max_packet_size: int + class PerConnectionState: + def __init__(self) -> None: + self.in_flight = 0 + self.drained = asyncio.Event() + def __init__( self, max_packet_size: int, @@ -82,9 +86,12 @@ class DataPacketQueue(utils.EventEmitter): self.max_packet_size = max_packet_size self.max_in_flight = max_in_flight self._in_flight = 0 # Total number of packets in flight across all connections - self._in_flight_per_connection: dict[int, int] = collections.defaultdict( - int - ) # Number of packets in flight per connection + self._connection_state: dict[int, DataPacketQueue.PerConnectionState] = ( + collections.defaultdict(DataPacketQueue.PerConnectionState) + ) + self._drained_per_connection: dict[int, asyncio.Event] = ( + collections.defaultdict(asyncio.Event) + ) self._send = send self._packets: collections.deque[tuple[hci.HCI_Packet, int]] = ( collections.deque() @@ -136,36 +143,40 @@ class DataPacketQueue(utils.EventEmitter): self._completed += flushed_count self._packets = collections.deque(packets_to_keep) - if connection_handle in self._in_flight_per_connection: - in_flight = self._in_flight_per_connection[connection_handle] + if connection_state := self._connection_state.pop(connection_handle, None): + in_flight = connection_state.in_flight self._completed += in_flight self._in_flight -= in_flight - del self._in_flight_per_connection[connection_handle] + connection_state.drained.set() def _check_queue(self) -> None: while self._packets and self._in_flight < self.max_in_flight: packet, connection_handle = self._packets.pop() self._send(packet) self._in_flight += 1 - self._in_flight_per_connection[connection_handle] += 1 + connection_state = self._connection_state[connection_handle] + connection_state.in_flight += 1 + connection_state.drained.clear() def on_packets_completed(self, packet_count: int, connection_handle: int) -> None: """Mark one or more packets associated with a connection as completed.""" - if connection_handle not in self._in_flight_per_connection: + if connection_handle not in self._connection_state: logger.warning( f'received completion for unknown connection {connection_handle}' ) return - in_flight_for_connection = self._in_flight_per_connection[connection_handle] - if packet_count <= in_flight_for_connection: - self._in_flight_per_connection[connection_handle] -= packet_count + connection_state = self._connection_state[connection_handle] + if packet_count <= connection_state.in_flight: + connection_state.in_flight -= packet_count else: logger.warning( f'{packet_count} completed for {connection_handle} ' - f'but only {in_flight_for_connection} in flight' + f'but only {connection_state.in_flight} in flight' ) - self._in_flight_per_connection[connection_handle] = 0 + connection_state.in_flight = 0 + if connection_state.in_flight == 0: + connection_state.drained.set() if packet_count <= self._in_flight: self._in_flight -= packet_count @@ -180,6 +191,13 @@ class DataPacketQueue(utils.EventEmitter): self._check_queue() self.emit('flow') + async def drain(self, connection_handle: int) -> None: + """Wait until there are no pending packets for a connection.""" + if not (connection_state := self._connection_state.get(connection_handle)): + raise ValueError('no such connection') + + await connection_state.drained.wait() + # ----------------------------------------------------------------------------- class Connection: @@ -1269,7 +1287,24 @@ class Host(utils.EventEmitter): self.cis_links[event.connection_handle] = IsoLink( handle=event.connection_handle, packet_queue=self.iso_packet_queue ) - self.emit('cis_establishment', event.connection_handle) + self.emit( + 'cis_establishment', + event.connection_handle, + event.cig_sync_delay, + event.cis_sync_delay, + event.transport_latency_c_to_p, + event.transport_latency_p_to_c, + event.phy_c_to_p, + event.phy_p_to_c, + event.nse, + event.bn_c_to_p, + event.bn_p_to_c, + event.ft_c_to_p, + event.ft_p_to_c, + event.max_pdu_c_to_p, + event.max_pdu_p_to_c, + event.iso_interval, + ) else: self.emit( 'cis_establishment_failure', event.connection_handle, event.status @@ -1372,6 +1407,10 @@ class Host(utils.EventEmitter): self.emit('role_change_failure', event.bd_addr, event.status) def on_hci_le_data_length_change_event(self, event): + if (connection := self.connections.get(event.connection_handle)) is None: + logger.warning('!!! DATA LENGTH CHANGE: unknown handle') + return + self.emit( 'connection_data_length_change', event.connection_handle, diff --git a/bumble/profiles/ascs.py b/bumble/profiles/ascs.py index 409d054..ded7897 100644 --- a/bumble/profiles/ascs.py +++ b/bumble/profiles/ascs.py @@ -343,22 +343,16 @@ class AseStateMachine(gatt.Characteristic): self.service.device.EVENT_CIS_ESTABLISHMENT, self.on_cis_establishment ) - def on_cis_request( - self, - acl_connection: device.Connection, - cis_handle: int, - cig_id: int, - cis_id: int, - ) -> None: + def on_cis_request(self, cis_link: device.CisLink) -> None: if ( - cig_id == self.cig_id - and cis_id == self.cis_id + cis_link.cig_id == self.cig_id + and cis_link.cis_id == self.cis_id and self.state == self.State.ENABLING ): utils.cancel_on_event( - acl_connection, + cis_link.acl_connection, 'flush', - self.service.device.accept_cis_request(cis_handle), + self.service.device.accept_cis_request(cis_link), ) def on_cis_establishment(self, cis_link: device.CisLink) -> None: diff --git a/examples/run_cig_setup.py b/examples/run_cig_setup.py index 75abb7c..04c9c40 100644 --- a/examples/run_cig_setup.py +++ b/examples/run_cig_setup.py @@ -20,7 +20,7 @@ import logging import sys import os from bumble import utils -from bumble.device import Device, Connection +from bumble.device import Device, CigParameters, CisLink, Connection from bumble.hci import ( OwnAddressType, ) @@ -61,27 +61,39 @@ async def main() -> None: devices[0].random_address, own_address_type=OwnAddressType.RANDOM ) - cid_ids = [2, 3] cis_handles = await devices[1].setup_cig( - cig_id=1, - cis_id=cid_ids, - sdu_interval=(10000, 255), - framing=0, - max_sdu=(120, 0), - retransmission_number=13, - max_transport_latency=(100, 5), + CigParameters( + cig_id=1, + cis_parameters=[ + CigParameters.CisParameters( + cis_id=2, + max_sdu_c_to_p=120, + max_sdu_p_to_c=0, + rtn_c_to_p=13, + rtn_p_to_c=13, + ), + CigParameters.CisParameters( + cis_id=3, + max_sdu_c_to_p=120, + max_sdu_p_to_c=0, + rtn_c_to_p=13, + rtn_p_to_c=13, + ), + ], + sdu_interval_c_to_p=10000, + sdu_interval_p_to_c=255, + framing=CigParameters.Framing.UNFRAMED, + max_transport_latency_c_to_p=100, + max_transport_latency_p_to_c=5, + ), ) - def on_cis_request( - connection: Connection, cis_handle: int, _cig_id: int, _cis_id: int - ): - connection.cancel_on_disconnection(devices[0].accept_cis_request(cis_handle)) + def on_cis_request(connection: Connection, cis_link: CisLink): + connection.cancel_on_disconnection(devices[0].accept_cis_request(cis_link)) devices[0].on('cis_request', on_cis_request) - cis_links = await devices[1].create_cis( - [(cis, connection.handle) for cis in cis_handles] - ) + cis_links = await devices[1].create_cis([(cis, connection) for cis in cis_handles]) for cis_link in cis_links: await cis_link.disconnect() diff --git a/examples/run_unicast_server.py b/examples/run_unicast_server.py index d316699..23e350f 100644 --- a/examples/run_unicast_server.py +++ b/examples/run_unicast_server.py @@ -77,7 +77,7 @@ file_outputs: dict[AseStateMachine, io.BufferedWriter] = {} # ----------------------------------------------------------------------------- async def main() -> None: if len(sys.argv) < 3: - print('Usage: run_cig_setup.py ' '') + print('Usage: run_cig_setup.py ') return print('<<< connecting to HCI...') @@ -149,12 +149,9 @@ async def main() -> None: sdu += pdu.iso_sdu_fragment file_outputs[ase].write(sdu) - def on_ase_state_change( - state: AseStateMachine.State, - ase: AseStateMachine, - ) -> None: - if state != AseStateMachine.State.STREAMING: - if file_output := file_outputs.pop(ase): + def on_ase_state_change(ase: AseStateMachine) -> None: + if ase.state != AseStateMachine.State.STREAMING: + if file_output := file_outputs.pop(ase, None): file_output.close() else: file_output = open(f'{datetime.datetime.now().isoformat()}.lc3', 'wb') diff --git a/extras/android/RemoteHCI/app/src/main/java/com/github/google/bumble/remotehci/HciProxy.java b/extras/android/RemoteHCI/app/src/main/java/com/github/google/bumble/remotehci/HciProxy.java index 0b177f2..5b6f995 100644 --- a/extras/android/RemoteHCI/app/src/main/java/com/github/google/bumble/remotehci/HciProxy.java +++ b/extras/android/RemoteHCI/app/src/main/java/com/github/google/bumble/remotehci/HciProxy.java @@ -1,5 +1,7 @@ package com.github.google.bumble.remotehci; +import static com.github.google.bumble.remotehci.HciPacket.Type.COMMAND; + import android.os.RemoteException; import android.util.Log; @@ -16,6 +18,10 @@ public class HciProxy { private int mAclPacketsSent; private int mScoPacketsSent; + private static final byte[] LOOPBACK_COMMAND_COMPLETE_EVENT = { + 0x0E, 0x04, 0x01, 0x77, (byte)0xFC, 0x00 + }; + HciProxy(int port, Listener listener) throws HalException { this.mListener = listener; @@ -84,6 +90,14 @@ public class HciProxy { @Override public void onPacket(HciPacket.Type type, byte[] packet) { + // Short-circuit a local response when a special latency-testing packet + // is received. + if (type == COMMAND && packet[0] == (byte)0x77 && packet[1] == (byte)0xFC) { + Log.d(TAG, "LOOPBACK"); + mServer.sendPacket(HciPacket.Type.EVENT, LOOPBACK_COMMAND_COMPLETE_EVENT); + return; + } + Log.d(TAG, String.format("HOST->CONTROLLER: type=%s, size=%d", type, packet.length)); hciHal.sendPacket(type, packet); diff --git a/extras/android/RemoteHCI/app/src/main/java/com/github/google/bumble/remotehci/HciServer.java b/extras/android/RemoteHCI/app/src/main/java/com/github/google/bumble/remotehci/HciServer.java index 9332305..393e478 100644 --- a/extras/android/RemoteHCI/app/src/main/java/com/github/google/bumble/remotehci/HciServer.java +++ b/extras/android/RemoteHCI/app/src/main/java/com/github/google/bumble/remotehci/HciServer.java @@ -12,7 +12,7 @@ import java.net.Socket; public class HciServer { private static final String TAG = "HciServer"; - private static final int BUFFER_SIZE = 1024; + private static final int BUFFER_SIZE = 8192; private final int mPort; private final Listener mListener; private OutputStream mOutputStream; diff --git a/tests/device_test.py b/tests/device_test.py index 3e390a8..5ddbd8a 100644 --- a/tests/device_test.py +++ b/tests/device_test.py @@ -29,6 +29,8 @@ from bumble.core import ( from bumble.device import ( AdvertisingEventProperties, AdvertisingParameters, + CigParameters, + CisLink, Connection, Device, PeriodicAdvertisingParameters, @@ -480,16 +482,13 @@ async def test_cis(): peripheral_cis_futures = {} - def on_cis_request( - acl_connection: Connection, - cis_handle: int, - _cig_id: int, - _cis_id: int, - ): - acl_connection.cancel_on_disconnection( - devices[1].accept_cis_request(cis_handle) + def on_cis_request(cis_link: CisLink): + cis_link.acl_connection.cancel_on_disconnection( + devices[1].accept_cis_request(cis_link), + ) + peripheral_cis_futures[cis_link.handle] = ( + asyncio.get_running_loop().create_future() ) - peripheral_cis_futures[cis_handle] = asyncio.get_running_loop().create_future() devices[1].on('cis_request', on_cis_request) devices[1].on( @@ -498,19 +497,21 @@ async def test_cis(): ) cis_handles = await devices[0].setup_cig( - cig_id=1, - cis_id=[2, 3], - sdu_interval=(0, 0), - framing=0, - max_sdu=(0, 0), - retransmission_number=0, - max_transport_latency=(0, 0), + CigParameters( + cig_id=1, + cis_parameters=[ + CigParameters.CisParameters(cis_id=2), + CigParameters.CisParameters(cis_id=3), + ], + sdu_interval_c_to_p=0, + sdu_interval_p_to_c=0, + ), ) assert len(cis_handles) == 2 cis_links = await devices[0].create_cis( [ - (cis_handles[0], devices.connections[0].handle), - (cis_handles[1], devices.connections[0].handle), + (cis_handles[0], devices.connections[0]), + (cis_handles[1], devices.connections[0]), ] ) await asyncio.gather(*peripheral_cis_futures.values()) @@ -528,32 +529,27 @@ async def test_cis_setup_failure(): cis_requests = asyncio.Queue() - def on_cis_request( - acl_connection: Connection, - cis_handle: int, - cig_id: int, - cis_id: int, - ): - del acl_connection, cig_id, cis_id - cis_requests.put_nowait(cis_handle) + def on_cis_request(cis_link: CisLink): + cis_requests.put_nowait(cis_link) devices[1].on('cis_request', on_cis_request) cis_handles = await devices[0].setup_cig( - cig_id=1, - cis_id=[2], - sdu_interval=(0, 0), - framing=0, - max_sdu=(0, 0), - retransmission_number=0, - max_transport_latency=(0, 0), + CigParameters( + cig_id=1, + cis_parameters=[ + CigParameters.CisParameters(cis_id=2), + ], + sdu_interval_c_to_p=0, + sdu_interval_p_to_c=0, + ), ) assert len(cis_handles) == 1 cis_create_task = asyncio.create_task( devices[0].create_cis( [ - (cis_handles[0], devices.connections[0].handle), + (cis_handles[0], devices.connections[0]), ] ) )