mirror of
https://github.com/google/bumble.git
synced 2026-05-10 04:18:03 +00:00
wip (+4 squashed commits)
Squashed commits: [d29a350] wip [7f541ed] wip [1e2902e] basic working version [14b497a] wip
This commit is contained in:
159
utils/rtk_util.py
Normal file
159
utils/rtk_util.py
Normal file
@@ -0,0 +1,159 @@
|
||||
# Copyright 2021-2022 Google LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# https://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Imports
|
||||
# -----------------------------------------------------------------------------
|
||||
import logging
|
||||
import asyncio
|
||||
import os
|
||||
|
||||
import click
|
||||
|
||||
from bumble import transport
|
||||
from bumble.host import Host
|
||||
from bumble.drivers import rtk
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Logging
|
||||
# -----------------------------------------------------------------------------
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def do_parse(firmware_path):
|
||||
with open(firmware_path, 'rb') as firmware_file:
|
||||
firmware_data = firmware_file.read()
|
||||
firmware = rtk.Firmware(firmware_data)
|
||||
print(f'Firmware: version=0x{firmware.version:08X} project_id=0x{firmware.project_id:04X}')
|
||||
for patch in firmware.patches:
|
||||
print(
|
||||
f" Patch: chip_id=0x{patch[0]:04X}, "
|
||||
f"{len(patch[1])} bytes, "
|
||||
f"SVN Version={patch[2]:08X}"
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def do_load(usb_transport, force):
|
||||
async with await transport.open_transport_or_link(usb_transport) as (
|
||||
hci_source,
|
||||
hci_sink,
|
||||
):
|
||||
# Create a host to communicate with the device
|
||||
host = Host(hci_source, hci_sink)
|
||||
await host.reset(raw=True)
|
||||
|
||||
# Get the driver.
|
||||
driver = await rtk.Driver.for_host(host, force)
|
||||
if driver is None:
|
||||
if not force:
|
||||
print("Firmware already loaded or no supported driver for this device.")
|
||||
return
|
||||
|
||||
await driver.download_firmware()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def do_drop(usb_transport):
|
||||
async with await transport.open_transport_or_link(usb_transport) as (
|
||||
hci_source,
|
||||
hci_sink,
|
||||
):
|
||||
# Create a host to communicate with the device
|
||||
host = Host(hci_source, hci_sink)
|
||||
await host.reset(raw=True)
|
||||
|
||||
# Tell the device to reset/drop any loaded patch
|
||||
await rtk.Driver.drop_firmware(host)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def do_info(usb_transport, force):
|
||||
async with await transport.open_transport(usb_transport) as (
|
||||
hci_source,
|
||||
hci_sink,
|
||||
):
|
||||
# Create a host to communicate with the device
|
||||
host = Host(hci_source, hci_sink)
|
||||
await host.reset(raw=True)
|
||||
|
||||
# Check if this is a supported device.
|
||||
if not force and not rtk.Driver.check(host):
|
||||
print("USB device not supported by this RTK driver")
|
||||
return
|
||||
|
||||
# Get the driver info.
|
||||
driver_info = await rtk.Driver.driver_info_for_host(host)
|
||||
if driver_info:
|
||||
print(
|
||||
"Driver:\n"
|
||||
f" ROM: {driver_info.rom:04X}\n"
|
||||
f" Firmware: {driver_info.fw_name}\n"
|
||||
f" Config: {driver_info.config_name}\n"
|
||||
)
|
||||
else:
|
||||
print("Firmware already loaded or no supported driver for this device.")
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@click.group()
|
||||
def main():
|
||||
pass
|
||||
|
||||
|
||||
@main.command
|
||||
@click.argument("firmware_path")
|
||||
def parse(firmware_path):
|
||||
"""Parse a firmware image."""
|
||||
do_parse(firmware_path)
|
||||
|
||||
|
||||
@main.command
|
||||
@click.argument("usb_transport")
|
||||
@click.option(
|
||||
"--force",
|
||||
is_flag=True,
|
||||
default=False,
|
||||
help="Load even if the USB info doesn't match",
|
||||
)
|
||||
def load(usb_transport, force):
|
||||
"""Load a firmware image into the USB dongle."""
|
||||
asyncio.run(do_load(usb_transport, force))
|
||||
|
||||
|
||||
@main.command
|
||||
@click.argument("usb_transport")
|
||||
def drop(usb_transport):
|
||||
"""Drop a firmware image from the USB dongle."""
|
||||
asyncio.run(do_drop(usb_transport))
|
||||
|
||||
|
||||
@main.command
|
||||
@click.argument("usb_transport")
|
||||
@click.option(
|
||||
"--force",
|
||||
is_flag=True,
|
||||
default=False,
|
||||
help="Try to get the device info even if the USB info doesn't match",
|
||||
)
|
||||
def info(usb_transport, force):
|
||||
"""Get the firmware info from a USB dongle."""
|
||||
asyncio.run(do_info(usb_transport, force))
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
if __name__ == '__main__':
|
||||
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
|
||||
main()
|
||||
Reference in New Issue
Block a user