forked from auracaster/bumble_mirror
feat(intel): clarify firmware/DDC flow and preserve driver metadata
- Add explanatory comments across intel driver to clarify metadata parsing. - Ensure driver selection preserves runtime options (e.g. "intel/ddc_override:AABB") so driver-specific metadata is passed through to the host and available to drivers via host.hci_metadata. - Ensure transport parsing regex and metadata extraction so transport/source metadata is populated and visible to drivers. - Example usage: passing [driver=intel/ddc_override:AABB] will be preserved and can be consumed by the Intel driver to apply a DDC override blob.
This commit is contained in:
@@ -49,6 +49,10 @@ async def get_driver_for_host(host: Host) -> Optional[Driver]:
|
|||||||
driver_classes: dict[str, type[Driver]] = {"rtk": rtk.Driver, "intel": intel.Driver}
|
driver_classes: dict[str, type[Driver]] = {"rtk": rtk.Driver, "intel": intel.Driver}
|
||||||
probe_list: Iterable[str]
|
probe_list: Iterable[str]
|
||||||
if driver_name := host.hci_metadata.get("driver"):
|
if driver_name := host.hci_metadata.get("driver"):
|
||||||
|
# The "driver" metadata may include runtime options after a '/' (for example
|
||||||
|
# "intel/ddc=..."). Keep only the base driver name (the portion before the
|
||||||
|
# first slash) so it matches a key in driver_classes (e.g. "intel").
|
||||||
|
driver_name = driver_name.split("/")[0]
|
||||||
# Only probe a single driver
|
# Only probe a single driver
|
||||||
probe_list = [driver_name]
|
probe_list = [driver_name]
|
||||||
else:
|
else:
|
||||||
|
|||||||
@@ -459,6 +459,10 @@ class Driver(common.Driver):
|
|||||||
== ModeOfOperation.OPERATIONAL
|
== ModeOfOperation.OPERATIONAL
|
||||||
):
|
):
|
||||||
logger.debug("firmware already loaded")
|
logger.debug("firmware already loaded")
|
||||||
|
# If the firmeare is already loaded, still attempt to load any
|
||||||
|
# device configuration (DDC). DDC can be applied independently of a
|
||||||
|
# firmware reload and may contain runtime overrides or patches.
|
||||||
|
await self.load_ddc_if_any()
|
||||||
return
|
return
|
||||||
|
|
||||||
# We only support some platforms and variants.
|
# We only support some platforms and variants.
|
||||||
@@ -598,17 +602,39 @@ class Driver(common.Driver):
|
|||||||
await self.reset_complete.wait()
|
await self.reset_complete.wait()
|
||||||
logger.debug("reset complete")
|
logger.debug("reset complete")
|
||||||
|
|
||||||
# Load the device config if there is one.
|
await self.load_ddc_if_any(firmware_base_name)
|
||||||
|
|
||||||
|
async def load_ddc_if_any(self, firmware_base_name: Optional[str] = None) -> None:
|
||||||
|
"""
|
||||||
|
Check for and load any Device Data Configuration (DDC) blobs.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
firmware_base_name: Base name of the selected firmware (e.g. "ibt-XXXX-YYYY").
|
||||||
|
If None, don't attempt to look up a .ddc file that
|
||||||
|
corresponds to the firmware image.
|
||||||
|
Priority:
|
||||||
|
1. If a ddc_override was provided via driver metadata, use it (highest priority).
|
||||||
|
2. Otherwise, if firmware_base_name is provided, attempt to find a .ddc file
|
||||||
|
that corresponds to the selected firmware image.
|
||||||
|
3. Finally, if a ddc_addon was provided, append/load it after the primary DDC.
|
||||||
|
"""
|
||||||
|
# If an explicit DDC override was supplied, use it and skip file lookup.
|
||||||
if self.ddc_override:
|
if self.ddc_override:
|
||||||
logger.debug("loading overridden DDC")
|
logger.debug("loading overridden DDC")
|
||||||
await self.load_device_config(self.ddc_override)
|
await self.load_device_config(self.ddc_override)
|
||||||
else:
|
else:
|
||||||
ddc_name = f"{firmware_base_name}.ddc"
|
# Only attempt .ddc file lookup if a firmware_base_name was provided.
|
||||||
ddc_path = _find_binary_path(ddc_name)
|
if firmware_base_name is None:
|
||||||
if ddc_path:
|
logger.debug(
|
||||||
logger.debug(f"loading DDC from {ddc_path}")
|
"no firmware_base_name provided; skipping .ddc file lookup"
|
||||||
ddc_data = ddc_path.read_bytes()
|
)
|
||||||
await self.load_device_config(ddc_data)
|
else:
|
||||||
|
ddc_name = f"{firmware_base_name}.ddc"
|
||||||
|
ddc_path = _find_binary_path(ddc_name)
|
||||||
|
if ddc_path:
|
||||||
|
logger.debug(f"loading DDC from {ddc_path}")
|
||||||
|
ddc_data = ddc_path.read_bytes()
|
||||||
|
await self.load_device_config(ddc_data)
|
||||||
if self.ddc_addon:
|
if self.ddc_addon:
|
||||||
logger.debug("loading DDC addon")
|
logger.debug("loading DDC addon")
|
||||||
await self.load_device_config(self.ddc_addon)
|
await self.load_device_config(self.ddc_addon)
|
||||||
|
|||||||
@@ -84,7 +84,12 @@ async def open_transport(name: str) -> Transport:
|
|||||||
scheme, *tail = name.split(':', 1)
|
scheme, *tail = name.split(':', 1)
|
||||||
spec = tail[0] if tail else None
|
spec = tail[0] if tail else None
|
||||||
metadata = None
|
metadata = None
|
||||||
if spec and (m := re.search(r'\[(\w+=\w+(?:,\w+=\w+)*,?)\]', spec)):
|
# If a spec is provided, check for a metadata section in square brackets.
|
||||||
|
# The regex captures a comma-separated list of key=value pairs (allowing an
|
||||||
|
# optional trailing comma). The key is matched by \w+ and the value by [^,\]]+,
|
||||||
|
# meaning the value may contain any character except a comma or a closing
|
||||||
|
# bracket (']').
|
||||||
|
if spec and (m := re.search(r'\[(\w+=[^,\]]+(?:,\w+=[^,\]]+)*,?)\]', spec)):
|
||||||
metadata_str = m.group(1)
|
metadata_str = m.group(1)
|
||||||
if m.start() == 0:
|
if m.start() == 0:
|
||||||
# <metadata><spec>
|
# <metadata><spec>
|
||||||
|
|||||||
Reference in New Issue
Block a user