forked from auracaster/bumble_mirror
Compare commits
11 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| f8077d7996 | |||
| 739907fa31 | |||
| c98275f385 | |||
| f61fd64c0b | |||
| 5b33e715da | |||
| b885f29318 | |||
| 7ca13188d5 | |||
| 89586d5d18 | |||
| 381032ceb9 | |||
| 12ca1c01f0 | |||
| a7111d0107 |
@@ -6,6 +6,8 @@ on:
|
||||
branches: [ main ]
|
||||
pull_request:
|
||||
branches: [ main ]
|
||||
workflow_dispatch:
|
||||
branches: [main]
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
|
||||
@@ -17,6 +17,8 @@ on:
|
||||
pull_request:
|
||||
# The branches below must be a subset of the branches above
|
||||
branches: [ main ]
|
||||
workflow_dispatch:
|
||||
branches: [main]
|
||||
schedule:
|
||||
- cron: '39 21 * * 4'
|
||||
|
||||
|
||||
@@ -7,6 +7,10 @@ on:
|
||||
branches: [ main ]
|
||||
paths:
|
||||
- 'extras/android/BtBench/**'
|
||||
workflow_dispatch:
|
||||
branches: [main]
|
||||
paths:
|
||||
- 'extras/android/BtBench/**'
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
|
||||
@@ -5,6 +5,8 @@ on:
|
||||
branches: [ main ]
|
||||
pull_request:
|
||||
branches: [ main ]
|
||||
workflow_dispatch:
|
||||
branches: [main]
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
|
||||
@@ -6,6 +6,8 @@ on:
|
||||
branches: [ main ]
|
||||
pull_request:
|
||||
branches: [ main ]
|
||||
workflow_dispatch:
|
||||
branches: [main]
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
|
||||
Vendored
+4
-6
@@ -94,17 +94,15 @@
|
||||
"ycursor"
|
||||
],
|
||||
"[python]": {
|
||||
"editor.rulers": [88],
|
||||
"editor.defaultFormatter": "ms-python.black-formatter"
|
||||
"editor.rulers": [88]
|
||||
},
|
||||
"python.formatting.provider": "black",
|
||||
"pylint.importStrategy": "useBundled",
|
||||
"python.testing.pytestArgs": [
|
||||
"."
|
||||
],
|
||||
"python.testing.unittestEnabled": false,
|
||||
"python.testing.pytestEnabled": true,
|
||||
"python-envs.defaultEnvManager": "ms-python.python:system",
|
||||
"python-envs.pythonProjects": [],
|
||||
"python.terminal.launchArgs": [
|
||||
|
||||
]
|
||||
"python-envs.pythonProjects": []
|
||||
}
|
||||
|
||||
+21
-3
@@ -489,6 +489,21 @@ class Driver(common.Driver):
|
||||
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
async def get_loaded_firmware_version(host):
|
||||
response = await host.send_command(HCI_RTK_Read_ROM_Version_Command())
|
||||
|
||||
if response.return_parameters.status != hci.HCI_SUCCESS:
|
||||
return None
|
||||
|
||||
response = await host.send_command(
|
||||
hci.HCI_Read_Local_Version_Information_Command(), check_result=True
|
||||
)
|
||||
return (
|
||||
response.return_parameters.hci_subversion << 16
|
||||
| response.return_parameters.lmp_subversion
|
||||
)
|
||||
|
||||
@classmethod
|
||||
async def driver_info_for_host(cls, host):
|
||||
try:
|
||||
@@ -592,7 +607,7 @@ class Driver(common.Driver):
|
||||
)
|
||||
if response.return_parameters.status != hci.HCI_SUCCESS:
|
||||
logger.warning("can't get ROM version")
|
||||
return
|
||||
return None
|
||||
rom_version = response.return_parameters.version
|
||||
logger.debug(f"ROM version before download: {rom_version:04X}")
|
||||
else:
|
||||
@@ -600,13 +615,14 @@ class Driver(common.Driver):
|
||||
|
||||
firmware = Firmware(self.firmware)
|
||||
logger.debug(f"firmware: project_id=0x{firmware.project_id:04X}")
|
||||
logger.debug(f"firmware: version=0x{firmware.version:04X}")
|
||||
for patch in firmware.patches:
|
||||
if patch[0] == rom_version + 1:
|
||||
logger.debug(f"using patch {patch[0]}")
|
||||
break
|
||||
else:
|
||||
logger.warning("no valid patch found for rom version {rom_version}")
|
||||
return
|
||||
return None
|
||||
|
||||
# Append the config if there is one.
|
||||
if self.config:
|
||||
@@ -642,7 +658,9 @@ class Driver(common.Driver):
|
||||
logger.warning("can't get ROM version")
|
||||
else:
|
||||
rom_version = response.return_parameters.version
|
||||
logger.debug(f"ROM version after download: {rom_version:04X}")
|
||||
logger.debug(f"ROM version after download: {rom_version:02X}")
|
||||
|
||||
return firmware.version
|
||||
|
||||
async def download_firmware(self):
|
||||
if self.driver_info.rom == RTK_ROM_LMP_8723A:
|
||||
|
||||
@@ -452,6 +452,16 @@ class AseStateMachine(gatt.Characteristic):
|
||||
|
||||
self.metadata = le_audio.Metadata.from_bytes(metadata)
|
||||
self.state = self.State.ENABLING
|
||||
# CIS could be established before enable.
|
||||
if cis_link := next(
|
||||
(
|
||||
cis_link
|
||||
for cis_link in self.service.device.cis_links.values()
|
||||
if cis_link.cig_id == self.cig_id and cis_link.cis_id == self.cis_id
|
||||
),
|
||||
None,
|
||||
):
|
||||
self.on_cis_establishment(cis_link)
|
||||
|
||||
return (AseResponseCode.SUCCESS, AseReasonCode.NONE)
|
||||
|
||||
|
||||
+6
-5
@@ -1571,11 +1571,12 @@ class Session:
|
||||
if self.pairing_method == PairingMethod.CTKD_OVER_CLASSIC:
|
||||
# Authentication is already done in SMP, so remote shall start keys distribution immediately
|
||||
return
|
||||
elif self.sc:
|
||||
|
||||
if self.sc:
|
||||
self.send_public_key_command()
|
||||
|
||||
if self.pairing_method == PairingMethod.PASSKEY:
|
||||
self.display_or_input_passkey()
|
||||
|
||||
self.send_public_key_command()
|
||||
else:
|
||||
if self.pairing_method == PairingMethod.PASSKEY:
|
||||
self.display_or_input_passkey(self.send_pairing_confirm_command)
|
||||
@@ -1848,10 +1849,10 @@ class Session:
|
||||
elif self.pairing_method == PairingMethod.PASSKEY:
|
||||
self.send_pairing_confirm_command()
|
||||
else:
|
||||
# Send our public key back to the initiator
|
||||
self.send_public_key_command()
|
||||
|
||||
def next_steps() -> None:
|
||||
# Send our public key back to the initiator
|
||||
self.send_public_key_command()
|
||||
|
||||
if self.pairing_method in (
|
||||
PairingMethod.JUST_WORKS,
|
||||
|
||||
+107
-9
@@ -16,7 +16,6 @@
|
||||
# Imports
|
||||
# -----------------------------------------------------------------------------
|
||||
import asyncio
|
||||
import os
|
||||
import functools
|
||||
import pytest
|
||||
import logging
|
||||
@@ -55,7 +54,7 @@ from bumble.profiles.pacs import (
|
||||
PublishedAudioCapabilitiesServiceProxy,
|
||||
)
|
||||
from bumble.profiles.le_audio import Metadata
|
||||
from tests.test_utils import TwoDevices
|
||||
from tests.test_utils import TwoDevices, async_barrier
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -441,15 +440,114 @@ async def test_ascs():
|
||||
assert (await notifications[1].get())[:2] == bytes([1, AseStateMachine.State.IDLE])
|
||||
assert (await notifications[2].get())[:2] == bytes([2, AseStateMachine.State.IDLE])
|
||||
|
||||
await asyncio.sleep(0.001)
|
||||
await async_barrier()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def run():
|
||||
await test_pacs()
|
||||
@pytest.mark.asyncio
|
||||
async def test_ascs_enable_source_then_sink():
|
||||
devices = TwoDevices()
|
||||
ascs_server = AudioStreamControlService(
|
||||
device=devices[1], sink_ase_id=[1], source_ase_id=[2]
|
||||
)
|
||||
sink_ase = ascs_server.ase_state_machines[1]
|
||||
source_ase = ascs_server.ase_state_machines[2]
|
||||
devices[1].add_service(ascs_server)
|
||||
condition = asyncio.Condition()
|
||||
|
||||
async def on_state_change():
|
||||
async with condition:
|
||||
condition.notify_all()
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
if __name__ == '__main__':
|
||||
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
|
||||
asyncio.run(run())
|
||||
sink_ase.on(sink_ase.EVENT_STATE_CHANGE, on_state_change)
|
||||
source_ase.on(sink_ase.EVENT_STATE_CHANGE, on_state_change)
|
||||
|
||||
await devices.setup_connection()
|
||||
peer = device.Peer(devices.connections[0])
|
||||
ascs_client = await peer.discover_service_and_create_proxy(
|
||||
AudioStreamControlServiceProxy
|
||||
)
|
||||
|
||||
# Config Codec
|
||||
config = CodecSpecificConfiguration(
|
||||
sampling_frequency=SamplingFrequency.FREQ_48000,
|
||||
frame_duration=FrameDuration.DURATION_10000_US,
|
||||
audio_channel_allocation=AudioLocation.FRONT_LEFT,
|
||||
octets_per_codec_frame=120,
|
||||
codec_frames_per_sdu=1,
|
||||
)
|
||||
await ascs_client.ase_control_point.write_value(
|
||||
ASE_Config_Codec(
|
||||
ase_id=[1, 2],
|
||||
target_latency=[3, 4],
|
||||
target_phy=[5, 6],
|
||||
codec_id=[CodingFormat(CodecID.LC3), CodingFormat(CodecID.LC3)],
|
||||
codec_specific_configuration=[config, config],
|
||||
)
|
||||
)
|
||||
async with condition:
|
||||
await condition.wait_for(
|
||||
lambda: (
|
||||
sink_ase.state == AseStateMachine.State.CODEC_CONFIGURED
|
||||
and source_ase.state == AseStateMachine.State.CODEC_CONFIGURED
|
||||
)
|
||||
)
|
||||
|
||||
# Config QOS
|
||||
await ascs_client.ase_control_point.write_value(
|
||||
ASE_Config_QOS(
|
||||
ase_id=[1, 2],
|
||||
cig_id=[1, 1],
|
||||
cis_id=[1, 1],
|
||||
sdu_interval=[100, 100],
|
||||
framing=[0, 0],
|
||||
phy=[1, 1],
|
||||
max_sdu=[100, 100],
|
||||
retransmission_number=[16, 16],
|
||||
max_transport_latency=[150, 150],
|
||||
presentation_delay=[10, 10],
|
||||
)
|
||||
)
|
||||
async with condition:
|
||||
await condition.wait_for(
|
||||
lambda: (
|
||||
sink_ase.state == AseStateMachine.State.QOS_CONFIGURED
|
||||
and source_ase.state == AseStateMachine.State.QOS_CONFIGURED
|
||||
)
|
||||
)
|
||||
|
||||
# Enable ASE 2
|
||||
await ascs_client.ase_control_point.write_value(
|
||||
ASE_Enable(ase_id=[2], metadata=[b'foo'])
|
||||
)
|
||||
await async_barrier()
|
||||
cis_handles = await devices[0].setup_cig(
|
||||
device.CigParameters(
|
||||
cig_id=1,
|
||||
cis_parameters=[device.CigParameters.CisParameters(cis_id=1)],
|
||||
sdu_interval_c_to_p=100,
|
||||
sdu_interval_p_to_c=100,
|
||||
)
|
||||
)
|
||||
await devices[0].create_cis([(cis_handles[0], devices.connections[0])])
|
||||
|
||||
async with condition:
|
||||
await condition.wait_for(
|
||||
lambda: (source_ase.state == AseStateMachine.State.ENABLING)
|
||||
)
|
||||
await ascs_client.ase_control_point.write_value(
|
||||
ASE_Receiver_Start_Ready(ase_id=[2])
|
||||
)
|
||||
async with condition:
|
||||
await condition.wait_for(
|
||||
lambda: (source_ase.state == AseStateMachine.State.STREAMING)
|
||||
)
|
||||
|
||||
# Enable ASE 1
|
||||
await ascs_client.ase_control_point.write_value(
|
||||
ASE_Enable(ase_id=[1], metadata=[b'bar'])
|
||||
)
|
||||
async with condition:
|
||||
await condition.wait_for(
|
||||
lambda: (sink_ase.state == AseStateMachine.State.STREAMING)
|
||||
)
|
||||
|
||||
+3
-1
@@ -322,7 +322,9 @@ async def test_self_smp(io_caps, sc, mitm, key_dist):
|
||||
return 0
|
||||
else:
|
||||
if (
|
||||
self.peer_delegate.io_capability
|
||||
self.io_capability
|
||||
== PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY
|
||||
and self.peer_delegate.io_capability
|
||||
== PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY
|
||||
):
|
||||
peer_number = 6789
|
||||
|
||||
@@ -43,7 +43,8 @@ LINUX_KERNEL_GIT_SOURCE = "https://git.kernel.org/pub/scm/linux/kernel/git/firmw
|
||||
# -----------------------------------------------------------------------------
|
||||
def download_file(base_url, name):
|
||||
url = f"{base_url}/{name}"
|
||||
with urllib.request.urlopen(url) as file:
|
||||
request = urllib.request.Request(url, data=None, headers={"User-Agent": "Bumble"})
|
||||
with urllib.request.urlopen(request) as file:
|
||||
data = file.read()
|
||||
print(f"Downloaded {name}: {len(data)} bytes")
|
||||
return data
|
||||
|
||||
@@ -25,6 +25,7 @@ import click
|
||||
from bumble.colors import color
|
||||
from bumble.drivers import rtk
|
||||
from bumble.tools import rtk_util
|
||||
import bumble.logging
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -58,7 +59,9 @@ def download_file(base_url, name, remove_suffix):
|
||||
name = name.replace(".bin", "")
|
||||
|
||||
url = f"{base_url}/{name}"
|
||||
with urllib.request.urlopen(url) as file:
|
||||
logger.debug(f"downloading {url}")
|
||||
request = urllib.request.Request(url, data=None, headers={"User-Agent": "Bumble"})
|
||||
with urllib.request.urlopen(request) as file:
|
||||
data = file.read()
|
||||
print(f"Downloaded {name}: {len(data)} bytes")
|
||||
return data
|
||||
@@ -84,6 +87,7 @@ def download_file(base_url, name, remove_suffix):
|
||||
@click.option("--parse", is_flag=True, help="Parse the FW image after saving")
|
||||
def main(output_dir, source, single, force, parse):
|
||||
"""Download RTK firmware images and configs."""
|
||||
bumble.logging.setup_basic_logging()
|
||||
|
||||
# Check that the output dir exists
|
||||
if output_dir == '':
|
||||
|
||||
+22
-4
@@ -20,7 +20,7 @@ import logging
|
||||
|
||||
import click
|
||||
|
||||
from bumble import transport
|
||||
from bumble import company_ids, hci, transport
|
||||
from bumble.host import Host
|
||||
from bumble.drivers import rtk
|
||||
import bumble.logging
|
||||
@@ -62,10 +62,22 @@ async def do_load(usb_transport, force):
|
||||
# Get the driver.
|
||||
driver = await rtk.Driver.for_host(host, force)
|
||||
if driver is None:
|
||||
print("Firmware already loaded or no supported driver for this device.")
|
||||
# Try to see if there's already a FW image loaded
|
||||
firmware_version = await rtk.Driver.get_loaded_firmware_version(host)
|
||||
if firmware_version is None:
|
||||
print("Device not supported")
|
||||
return
|
||||
|
||||
print(f"Firmware already loaded: 0x{firmware_version:08X}")
|
||||
return
|
||||
|
||||
await driver.download_firmware()
|
||||
firmware_version = await driver.download_firmware()
|
||||
|
||||
if firmware_version is None:
|
||||
print("Failed to load firmware")
|
||||
return
|
||||
|
||||
print(f"Loaded firmware version 0x{firmware_version:08X}")
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -107,7 +119,13 @@ async def do_info(usb_transport, force):
|
||||
f" Config: {driver_info.config_name}\n"
|
||||
)
|
||||
else:
|
||||
print("Firmware already loaded or no supported driver for this device.")
|
||||
# Try to see if there's already a FW image loaded
|
||||
firmware_version = await rtk.Driver.get_loaded_firmware_version(host)
|
||||
if firmware_version is None:
|
||||
print("Device not supported")
|
||||
return
|
||||
|
||||
print(f"Firmware loaded: 0x{firmware_version:08X}")
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
Reference in New Issue
Block a user