Compare commits

...

11 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod f8077d7996 use user-agent header with intel FW downloader 2025-08-08 18:02:33 -07:00
Gilles Boccon-Gibod 739907fa31 rtk: print info when fw is already loaded 2025-08-08 18:02:33 -07:00
zxzxwu c98275f385 Merge pull request #743 from zxzxwu/ascs
ASCS: Handle when CIS link is established before enable
2025-08-06 12:18:52 +08:00
Josh Wu f61fd64c0b ASCS: Handle when CIS link is established before enable 2025-08-05 17:31:42 +08:00
Gilles Boccon-Gibod 5b33e715da Merge pull request #742 from barbibulle/gbg/enable-manual-workflow-run 2025-08-04 20:57:23 -07:00
Gilles Boccon-Gibod b885f29318 Merge pull request #740 from barbibulle/gbg/fix-735 2025-08-04 20:57:04 -07:00
Gilles Boccon-Gibod 7ca13188d5 Merge pull request #741 from barbibulle/gbg/update-black 2025-08-04 20:56:40 -07:00
Gilles Boccon-Gibod 89586d5d18 enable manual workflow runs 2025-08-04 19:46:04 -07:00
Gilles Boccon-Gibod 381032ceb9 update to black 25.1 2025-08-04 19:32:52 -07:00
Gilles Boccon-Gibod 12ca1c01f0 Revert "update to black formatter 25.1"
This reverts commit c034297bc0.
2025-08-04 19:24:30 -07:00
Gilles Boccon-Gibod a7111d0107 send public keys earlier 2025-08-04 19:18:12 -07:00
14 changed files with 192 additions and 30 deletions
+2
View File
@@ -6,6 +6,8 @@ on:
branches: [ main ]
pull_request:
branches: [ main ]
workflow_dispatch:
branches: [main]
permissions:
contents: read
+2
View File
@@ -17,6 +17,8 @@ on:
pull_request:
# The branches below must be a subset of the branches above
branches: [ main ]
workflow_dispatch:
branches: [main]
schedule:
- cron: '39 21 * * 4'
+4
View File
@@ -7,6 +7,10 @@ on:
branches: [ main ]
paths:
- 'extras/android/BtBench/**'
workflow_dispatch:
branches: [main]
paths:
- 'extras/android/BtBench/**'
permissions:
contents: read
+2
View File
@@ -5,6 +5,8 @@ on:
branches: [ main ]
pull_request:
branches: [ main ]
workflow_dispatch:
branches: [main]
permissions:
contents: read
+2
View File
@@ -6,6 +6,8 @@ on:
branches: [ main ]
pull_request:
branches: [ main ]
workflow_dispatch:
branches: [main]
permissions:
contents: read
+4 -6
View File
@@ -94,17 +94,15 @@
"ycursor"
],
"[python]": {
"editor.rulers": [88],
"editor.defaultFormatter": "ms-python.black-formatter"
"editor.rulers": [88]
},
"python.formatting.provider": "black",
"pylint.importStrategy": "useBundled",
"python.testing.pytestArgs": [
"."
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true,
"python-envs.defaultEnvManager": "ms-python.python:system",
"python-envs.pythonProjects": [],
"python.terminal.launchArgs": [
]
"python-envs.pythonProjects": []
}
+21 -3
View File
@@ -489,6 +489,21 @@ class Driver(common.Driver):
return True
@staticmethod
async def get_loaded_firmware_version(host):
response = await host.send_command(HCI_RTK_Read_ROM_Version_Command())
if response.return_parameters.status != hci.HCI_SUCCESS:
return None
response = await host.send_command(
hci.HCI_Read_Local_Version_Information_Command(), check_result=True
)
return (
response.return_parameters.hci_subversion << 16
| response.return_parameters.lmp_subversion
)
@classmethod
async def driver_info_for_host(cls, host):
try:
@@ -592,7 +607,7 @@ class Driver(common.Driver):
)
if response.return_parameters.status != hci.HCI_SUCCESS:
logger.warning("can't get ROM version")
return
return None
rom_version = response.return_parameters.version
logger.debug(f"ROM version before download: {rom_version:04X}")
else:
@@ -600,13 +615,14 @@ class Driver(common.Driver):
firmware = Firmware(self.firmware)
logger.debug(f"firmware: project_id=0x{firmware.project_id:04X}")
logger.debug(f"firmware: version=0x{firmware.version:04X}")
for patch in firmware.patches:
if patch[0] == rom_version + 1:
logger.debug(f"using patch {patch[0]}")
break
else:
logger.warning("no valid patch found for rom version {rom_version}")
return
return None
# Append the config if there is one.
if self.config:
@@ -642,7 +658,9 @@ class Driver(common.Driver):
logger.warning("can't get ROM version")
else:
rom_version = response.return_parameters.version
logger.debug(f"ROM version after download: {rom_version:04X}")
logger.debug(f"ROM version after download: {rom_version:02X}")
return firmware.version
async def download_firmware(self):
if self.driver_info.rom == RTK_ROM_LMP_8723A:
+10
View File
@@ -452,6 +452,16 @@ class AseStateMachine(gatt.Characteristic):
self.metadata = le_audio.Metadata.from_bytes(metadata)
self.state = self.State.ENABLING
# CIS could be established before enable.
if cis_link := next(
(
cis_link
for cis_link in self.service.device.cis_links.values()
if cis_link.cig_id == self.cig_id and cis_link.cis_id == self.cis_id
),
None,
):
self.on_cis_establishment(cis_link)
return (AseResponseCode.SUCCESS, AseReasonCode.NONE)
+6 -5
View File
@@ -1571,11 +1571,12 @@ class Session:
if self.pairing_method == PairingMethod.CTKD_OVER_CLASSIC:
# Authentication is already done in SMP, so remote shall start keys distribution immediately
return
elif self.sc:
if self.sc:
self.send_public_key_command()
if self.pairing_method == PairingMethod.PASSKEY:
self.display_or_input_passkey()
self.send_public_key_command()
else:
if self.pairing_method == PairingMethod.PASSKEY:
self.display_or_input_passkey(self.send_pairing_confirm_command)
@@ -1848,10 +1849,10 @@ class Session:
elif self.pairing_method == PairingMethod.PASSKEY:
self.send_pairing_confirm_command()
else:
# Send our public key back to the initiator
self.send_public_key_command()
def next_steps() -> None:
# Send our public key back to the initiator
self.send_public_key_command()
if self.pairing_method in (
PairingMethod.JUST_WORKS,
+107 -9
View File
@@ -16,7 +16,6 @@
# Imports
# -----------------------------------------------------------------------------
import asyncio
import os
import functools
import pytest
import logging
@@ -55,7 +54,7 @@ from bumble.profiles.pacs import (
PublishedAudioCapabilitiesServiceProxy,
)
from bumble.profiles.le_audio import Metadata
from tests.test_utils import TwoDevices
from tests.test_utils import TwoDevices, async_barrier
# -----------------------------------------------------------------------------
@@ -441,15 +440,114 @@ async def test_ascs():
assert (await notifications[1].get())[:2] == bytes([1, AseStateMachine.State.IDLE])
assert (await notifications[2].get())[:2] == bytes([2, AseStateMachine.State.IDLE])
await asyncio.sleep(0.001)
await async_barrier()
# -----------------------------------------------------------------------------
async def run():
await test_pacs()
@pytest.mark.asyncio
async def test_ascs_enable_source_then_sink():
devices = TwoDevices()
ascs_server = AudioStreamControlService(
device=devices[1], sink_ase_id=[1], source_ase_id=[2]
)
sink_ase = ascs_server.ase_state_machines[1]
source_ase = ascs_server.ase_state_machines[2]
devices[1].add_service(ascs_server)
condition = asyncio.Condition()
async def on_state_change():
async with condition:
condition.notify_all()
# -----------------------------------------------------------------------------
if __name__ == '__main__':
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
asyncio.run(run())
sink_ase.on(sink_ase.EVENT_STATE_CHANGE, on_state_change)
source_ase.on(sink_ase.EVENT_STATE_CHANGE, on_state_change)
await devices.setup_connection()
peer = device.Peer(devices.connections[0])
ascs_client = await peer.discover_service_and_create_proxy(
AudioStreamControlServiceProxy
)
# Config Codec
config = CodecSpecificConfiguration(
sampling_frequency=SamplingFrequency.FREQ_48000,
frame_duration=FrameDuration.DURATION_10000_US,
audio_channel_allocation=AudioLocation.FRONT_LEFT,
octets_per_codec_frame=120,
codec_frames_per_sdu=1,
)
await ascs_client.ase_control_point.write_value(
ASE_Config_Codec(
ase_id=[1, 2],
target_latency=[3, 4],
target_phy=[5, 6],
codec_id=[CodingFormat(CodecID.LC3), CodingFormat(CodecID.LC3)],
codec_specific_configuration=[config, config],
)
)
async with condition:
await condition.wait_for(
lambda: (
sink_ase.state == AseStateMachine.State.CODEC_CONFIGURED
and source_ase.state == AseStateMachine.State.CODEC_CONFIGURED
)
)
# Config QOS
await ascs_client.ase_control_point.write_value(
ASE_Config_QOS(
ase_id=[1, 2],
cig_id=[1, 1],
cis_id=[1, 1],
sdu_interval=[100, 100],
framing=[0, 0],
phy=[1, 1],
max_sdu=[100, 100],
retransmission_number=[16, 16],
max_transport_latency=[150, 150],
presentation_delay=[10, 10],
)
)
async with condition:
await condition.wait_for(
lambda: (
sink_ase.state == AseStateMachine.State.QOS_CONFIGURED
and source_ase.state == AseStateMachine.State.QOS_CONFIGURED
)
)
# Enable ASE 2
await ascs_client.ase_control_point.write_value(
ASE_Enable(ase_id=[2], metadata=[b'foo'])
)
await async_barrier()
cis_handles = await devices[0].setup_cig(
device.CigParameters(
cig_id=1,
cis_parameters=[device.CigParameters.CisParameters(cis_id=1)],
sdu_interval_c_to_p=100,
sdu_interval_p_to_c=100,
)
)
await devices[0].create_cis([(cis_handles[0], devices.connections[0])])
async with condition:
await condition.wait_for(
lambda: (source_ase.state == AseStateMachine.State.ENABLING)
)
await ascs_client.ase_control_point.write_value(
ASE_Receiver_Start_Ready(ase_id=[2])
)
async with condition:
await condition.wait_for(
lambda: (source_ase.state == AseStateMachine.State.STREAMING)
)
# Enable ASE 1
await ascs_client.ase_control_point.write_value(
ASE_Enable(ase_id=[1], metadata=[b'bar'])
)
async with condition:
await condition.wait_for(
lambda: (sink_ase.state == AseStateMachine.State.STREAMING)
)
+3 -1
View File
@@ -322,7 +322,9 @@ async def test_self_smp(io_caps, sc, mitm, key_dist):
return 0
else:
if (
self.peer_delegate.io_capability
self.io_capability
== PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY
and self.peer_delegate.io_capability
== PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY
):
peer_number = 6789
+2 -1
View File
@@ -43,7 +43,8 @@ LINUX_KERNEL_GIT_SOURCE = "https://git.kernel.org/pub/scm/linux/kernel/git/firmw
# -----------------------------------------------------------------------------
def download_file(base_url, name):
url = f"{base_url}/{name}"
with urllib.request.urlopen(url) as file:
request = urllib.request.Request(url, data=None, headers={"User-Agent": "Bumble"})
with urllib.request.urlopen(request) as file:
data = file.read()
print(f"Downloaded {name}: {len(data)} bytes")
return data
+5 -1
View File
@@ -25,6 +25,7 @@ import click
from bumble.colors import color
from bumble.drivers import rtk
from bumble.tools import rtk_util
import bumble.logging
# -----------------------------------------------------------------------------
@@ -58,7 +59,9 @@ def download_file(base_url, name, remove_suffix):
name = name.replace(".bin", "")
url = f"{base_url}/{name}"
with urllib.request.urlopen(url) as file:
logger.debug(f"downloading {url}")
request = urllib.request.Request(url, data=None, headers={"User-Agent": "Bumble"})
with urllib.request.urlopen(request) as file:
data = file.read()
print(f"Downloaded {name}: {len(data)} bytes")
return data
@@ -84,6 +87,7 @@ def download_file(base_url, name, remove_suffix):
@click.option("--parse", is_flag=True, help="Parse the FW image after saving")
def main(output_dir, source, single, force, parse):
"""Download RTK firmware images and configs."""
bumble.logging.setup_basic_logging()
# Check that the output dir exists
if output_dir == '':
+22 -4
View File
@@ -20,7 +20,7 @@ import logging
import click
from bumble import transport
from bumble import company_ids, hci, transport
from bumble.host import Host
from bumble.drivers import rtk
import bumble.logging
@@ -62,10 +62,22 @@ async def do_load(usb_transport, force):
# Get the driver.
driver = await rtk.Driver.for_host(host, force)
if driver is None:
print("Firmware already loaded or no supported driver for this device.")
# Try to see if there's already a FW image loaded
firmware_version = await rtk.Driver.get_loaded_firmware_version(host)
if firmware_version is None:
print("Device not supported")
return
print(f"Firmware already loaded: 0x{firmware_version:08X}")
return
await driver.download_firmware()
firmware_version = await driver.download_firmware()
if firmware_version is None:
print("Failed to load firmware")
return
print(f"Loaded firmware version 0x{firmware_version:08X}")
# -----------------------------------------------------------------------------
@@ -107,7 +119,13 @@ async def do_info(usb_transport, force):
f" Config: {driver_info.config_name}\n"
)
else:
print("Firmware already loaded or no supported driver for this device.")
# Try to see if there's already a FW image loaded
firmware_version = await rtk.Driver.get_loaded_firmware_version(host)
if firmware_version is None:
print("Device not supported")
return
print(f"Firmware loaded: 0x{firmware_version:08X}")
# -----------------------------------------------------------------------------