mirror of
https://github.com/google/bumble.git
synced 2026-06-02 07:47:03 +00:00
Merge pull request #583 from google/gbg/more-gatt-tests
regression test for GATT unsubscription
This commit is contained in:
@@ -0,0 +1,130 @@
|
||||
# Copyright 2024 Google LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# https://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Imports
|
||||
# -----------------------------------------------------------------------------
|
||||
import logging
|
||||
import pathlib
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
|
||||
import click
|
||||
|
||||
from bumble.colors import color
|
||||
from bumble.drivers import intel
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Logging
|
||||
# -----------------------------------------------------------------------------
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Constants
|
||||
# -----------------------------------------------------------------------------
|
||||
LINUX_KERNEL_GIT_SOURCE = "https://git.kernel.org/pub/scm/linux/kernel/git/firmware/linux-firmware.git/plain/intel"
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Functions
|
||||
# -----------------------------------------------------------------------------
|
||||
def download_file(base_url, name):
|
||||
url = f"{base_url}/{name}"
|
||||
with urllib.request.urlopen(url) as file:
|
||||
data = file.read()
|
||||
print(f"Downloaded {name}: {len(data)} bytes")
|
||||
return data
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@click.command
|
||||
@click.option(
|
||||
"--output-dir",
|
||||
default="",
|
||||
help="Output directory where the files will be saved. Defaults to the OS-specific"
|
||||
"app data dir, which the driver will check when trying to find firmware",
|
||||
show_default=True,
|
||||
)
|
||||
@click.option(
|
||||
"--source",
|
||||
type=click.Choice(["linux-kernel"]),
|
||||
default="linux-kernel",
|
||||
show_default=True,
|
||||
)
|
||||
@click.option("--single", help="Only download a single image set, by its base name")
|
||||
@click.option("--force", is_flag=True, help="Overwrite files if they already exist")
|
||||
def main(output_dir, source, single, force):
|
||||
"""Download Intel firmware images and configs."""
|
||||
|
||||
# Check that the output dir exists
|
||||
if output_dir == '':
|
||||
output_dir = intel.intel_firmware_dir()
|
||||
else:
|
||||
output_dir = pathlib.Path(output_dir)
|
||||
if not output_dir.is_dir():
|
||||
print("Output dir does not exist or is not a directory")
|
||||
return
|
||||
|
||||
base_url = {
|
||||
"linux-kernel": LINUX_KERNEL_GIT_SOURCE,
|
||||
}[source]
|
||||
|
||||
print("Downloading")
|
||||
print(color("FROM:", "green"), base_url)
|
||||
print(color("TO:", "green"), output_dir)
|
||||
|
||||
if single:
|
||||
images = [(f"{single}.sfi", f"{single}.ddc")]
|
||||
else:
|
||||
images = [
|
||||
(f"{base_name}.sfi", f"{base_name}.ddc")
|
||||
for base_name in intel.INTEL_FW_IMAGE_NAMES
|
||||
]
|
||||
|
||||
for fw_name, config_name in images:
|
||||
print(color("---", "yellow"))
|
||||
fw_image_out = output_dir / fw_name
|
||||
if not force and fw_image_out.exists():
|
||||
print(color(f"{fw_image_out} already exists, skipping", "red"))
|
||||
continue
|
||||
if config_name:
|
||||
config_image_out = output_dir / config_name
|
||||
if not force and config_image_out.exists():
|
||||
print(color("f{config_image_out} already exists, skipping", "red"))
|
||||
continue
|
||||
|
||||
try:
|
||||
fw_image = download_file(base_url, fw_name)
|
||||
except urllib.error.HTTPError as error:
|
||||
print(f"Failed to download {fw_name}: {error}")
|
||||
continue
|
||||
|
||||
config_image = None
|
||||
if config_name:
|
||||
try:
|
||||
config_image = download_file(base_url, config_name)
|
||||
except urllib.error.HTTPError as error:
|
||||
print(f"Failed to download {config_name}: {error}")
|
||||
continue
|
||||
|
||||
fw_image_out.write_bytes(fw_image)
|
||||
if config_image:
|
||||
config_image_out.write_bytes(config_image)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@@ -0,0 +1,146 @@
|
||||
# Copyright 2024 Google LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# https://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Imports
|
||||
# -----------------------------------------------------------------------------
|
||||
import logging
|
||||
import asyncio
|
||||
import os
|
||||
from typing import Any
|
||||
|
||||
import click
|
||||
|
||||
from bumble.colors import color
|
||||
from bumble import transport
|
||||
from bumble.drivers import intel
|
||||
from bumble.host import Host
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Logging
|
||||
# -----------------------------------------------------------------------------
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def print_device_info(device_info: dict[intel.ValueType, Any]) -> None:
|
||||
if (mode := device_info.get(intel.ValueType.CURRENT_MODE_OF_OPERATION)) is not None:
|
||||
print(
|
||||
color("MODE:", "yellow"),
|
||||
mode.name,
|
||||
)
|
||||
print(color("DETAILS:", "yellow"))
|
||||
for key, value in device_info.items():
|
||||
print(f" {color(key.name, 'green')}: {value}")
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def do_info(usb_transport, force):
|
||||
async with await transport.open_transport(usb_transport) as (
|
||||
hci_source,
|
||||
hci_sink,
|
||||
):
|
||||
# Create a host to communicate with the device
|
||||
host = Host(hci_source, hci_sink)
|
||||
|
||||
# Create a driver
|
||||
driver = await intel.Driver.for_host(host, force)
|
||||
|
||||
# Get and print the device info
|
||||
print_device_info(await driver.read_device_info())
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def do_load(usb_transport, force):
|
||||
async with await transport.open_transport(usb_transport) as (
|
||||
hci_source,
|
||||
hci_sink,
|
||||
):
|
||||
# Create a host to communicate with the device
|
||||
host = Host(hci_source, hci_sink)
|
||||
|
||||
# Create a driver
|
||||
driver = await intel.Driver.for_host(host, force)
|
||||
|
||||
# Reboot in bootloader mode
|
||||
await driver.load_firmware()
|
||||
|
||||
# Get and print the device info
|
||||
print_device_info(await driver.read_device_info())
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def do_bootloader(usb_transport, force):
|
||||
async with await transport.open_transport(usb_transport) as (
|
||||
hci_source,
|
||||
hci_sink,
|
||||
):
|
||||
# Create a host to communicate with the device
|
||||
host = Host(hci_source, hci_sink)
|
||||
|
||||
# Create a driver
|
||||
driver = await intel.Driver.for_host(host, force)
|
||||
|
||||
# Reboot in bootloader mode
|
||||
await driver.reboot_bootloader()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@click.group()
|
||||
def main():
|
||||
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
|
||||
|
||||
|
||||
@main.command
|
||||
@click.argument("usb_transport")
|
||||
@click.option(
|
||||
"--force",
|
||||
is_flag=True,
|
||||
default=False,
|
||||
help="Try to get the device info even if the USB info doesn't match",
|
||||
)
|
||||
def info(usb_transport, force):
|
||||
"""Get the firmware info."""
|
||||
asyncio.run(do_info(usb_transport, force))
|
||||
|
||||
|
||||
@main.command
|
||||
@click.argument("usb_transport")
|
||||
@click.option(
|
||||
"--force",
|
||||
is_flag=True,
|
||||
default=False,
|
||||
help="Load even if the USB info doesn't match",
|
||||
)
|
||||
def load(usb_transport, force):
|
||||
"""Load a firmware image."""
|
||||
asyncio.run(do_load(usb_transport, force))
|
||||
|
||||
|
||||
@main.command
|
||||
@click.argument("usb_transport")
|
||||
@click.option(
|
||||
"--force",
|
||||
is_flag=True,
|
||||
default=False,
|
||||
help="Attempt to reboot event if the USB info doesn't match",
|
||||
)
|
||||
def bootloader(usb_transport, force):
|
||||
"""Reboot in bootloader mode."""
|
||||
asyncio.run(do_bootloader(usb_transport, force))
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user