forked from auracaster/bumble_mirror
Compare commits
16 Commits
v0.0.190
...
gbg/comman
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
090158820f | ||
|
|
1b33c9eb74 | ||
|
|
6633228975 | ||
|
|
e9cba788a4 | ||
|
|
98822cfc6b | ||
|
|
97ad7e5741 | ||
|
|
71df062e07 | ||
|
|
049f9021e9 | ||
|
|
50eae2ef54 | ||
|
|
c8883a7d0f | ||
|
|
51321caf5b | ||
|
|
51a94288e2 | ||
|
|
8758856e8c | ||
|
|
deba181857 | ||
|
|
c65188dcbf | ||
|
|
21d607898d |
2
.github/workflows/code-check.yml
vendored
2
.github/workflows/code-check.yml
vendored
@@ -33,7 +33,7 @@ jobs:
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
python -m pip install --upgrade pip
|
||||
python -m pip install ".[build,test,development]"
|
||||
python -m pip install ".[build,test,development,pandora]"
|
||||
- name: Check
|
||||
run: |
|
||||
invoke project.pre-commit
|
||||
|
||||
2
.github/workflows/python-avatar.yml
vendored
2
.github/workflows/python-avatar.yml
vendored
@@ -32,7 +32,7 @@ jobs:
|
||||
- name: Install
|
||||
run: |
|
||||
python -m pip install --upgrade pip
|
||||
python -m pip install .[avatar]
|
||||
python -m pip install .[avatar,pandora]
|
||||
- name: Rootcanal
|
||||
run: nohup python -m rootcanal > rootcanal.log &
|
||||
- name: Test
|
||||
|
||||
@@ -90,6 +90,22 @@ if TYPE_CHECKING:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Utils
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
|
||||
def show_services(services: Iterable[ServiceProxy]) -> None:
|
||||
for service in services:
|
||||
print(color(str(service), 'cyan'))
|
||||
|
||||
for characteristic in service.characteristics:
|
||||
print(color(' ' + str(characteristic), 'magenta'))
|
||||
|
||||
for descriptor in characteristic.descriptors:
|
||||
print(color(' ' + str(descriptor), 'green'))
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Proxies
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
1028
bumble/hfp.py
1028
bumble/hfp.py
File diff suppressed because it is too large
Load Diff
@@ -184,7 +184,7 @@ class Host(AbortableEventEmitter):
|
||||
self.long_term_key_provider = None
|
||||
self.link_key_provider = None
|
||||
self.pairing_io_capability_provider = None # Classic only
|
||||
self.snooper = None
|
||||
self.snooper: Optional[Snooper] = None
|
||||
|
||||
# Connect to the source and sink if specified
|
||||
if controller_source:
|
||||
@@ -530,7 +530,9 @@ class Host(AbortableEventEmitter):
|
||||
|
||||
# Check the return parameters if required
|
||||
if check_result:
|
||||
if isinstance(response.return_parameters, int):
|
||||
if isinstance(response, hci.HCI_Command_Status_Event):
|
||||
status = response.status
|
||||
elif isinstance(response.return_parameters, int):
|
||||
status = response.return_parameters
|
||||
elif isinstance(response.return_parameters, bytes):
|
||||
# return parameters first field is a one byte status code
|
||||
|
||||
@@ -19,8 +19,8 @@
|
||||
import struct
|
||||
from typing import Optional, Tuple
|
||||
|
||||
from ..gatt_client import ProfileServiceProxy
|
||||
from ..gatt import (
|
||||
from bumble.gatt_client import ServiceProxy, ProfileServiceProxy, CharacteristicProxy
|
||||
from bumble.gatt import (
|
||||
GATT_DEVICE_INFORMATION_SERVICE,
|
||||
GATT_FIRMWARE_REVISION_STRING_CHARACTERISTIC,
|
||||
GATT_HARDWARE_REVISION_STRING_CHARACTERISTIC,
|
||||
@@ -104,7 +104,16 @@ class DeviceInformationService(TemplateService):
|
||||
class DeviceInformationServiceProxy(ProfileServiceProxy):
|
||||
SERVICE_CLASS = DeviceInformationService
|
||||
|
||||
def __init__(self, service_proxy):
|
||||
manufacturer_name: Optional[UTF8CharacteristicAdapter]
|
||||
model_number: Optional[UTF8CharacteristicAdapter]
|
||||
serial_number: Optional[UTF8CharacteristicAdapter]
|
||||
hardware_revision: Optional[UTF8CharacteristicAdapter]
|
||||
firmware_revision: Optional[UTF8CharacteristicAdapter]
|
||||
software_revision: Optional[UTF8CharacteristicAdapter]
|
||||
system_id: Optional[DelegatedCharacteristicAdapter]
|
||||
ieee_regulatory_certification_data_list: Optional[CharacteristicProxy]
|
||||
|
||||
def __init__(self, service_proxy: ServiceProxy):
|
||||
self.service_proxy = service_proxy
|
||||
|
||||
for field, uuid in (
|
||||
|
||||
@@ -23,11 +23,24 @@ import time
|
||||
import usb.core
|
||||
import usb.util
|
||||
|
||||
from typing import Optional
|
||||
from usb.core import Device as UsbDevice
|
||||
from usb.core import USBError
|
||||
from usb.util import CTRL_TYPE_CLASS, CTRL_RECIPIENT_OTHER
|
||||
from usb.legacy import REQ_SET_FEATURE, REQ_CLEAR_FEATURE, CLASS_HUB
|
||||
|
||||
from .common import Transport, ParserSource
|
||||
from .. import hci
|
||||
from ..colors import color
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Constant
|
||||
# -----------------------------------------------------------------------------
|
||||
USB_PORT_FEATURE_POWER = 8
|
||||
POWER_CYCLE_DELAY = 1
|
||||
RESET_DELAY = 3
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Logging
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -214,6 +227,10 @@ async def open_pyusb_transport(spec: str) -> Transport:
|
||||
usb_find = libusb_package.find
|
||||
|
||||
# Find the device according to the spec moniker
|
||||
power_cycle = False
|
||||
if spec.startswith('!'):
|
||||
power_cycle = True
|
||||
spec = spec[1:]
|
||||
if ':' in spec:
|
||||
vendor_id, product_id = spec.split(':')
|
||||
device = usb_find(idVendor=int(vendor_id, 16), idProduct=int(product_id, 16))
|
||||
@@ -245,6 +262,14 @@ async def open_pyusb_transport(spec: str) -> Transport:
|
||||
raise ValueError('device not found')
|
||||
logger.debug(f'USB Device: {device}')
|
||||
|
||||
# Power Cycle the device
|
||||
if power_cycle:
|
||||
try:
|
||||
device = await _power_cycle(device) # type: ignore
|
||||
except Exception as e:
|
||||
logging.debug(e)
|
||||
logging.info(f"Unable to power cycle {hex(device.idVendor)} {hex(device.idProduct)}") # type: ignore
|
||||
|
||||
# Collect the metadata
|
||||
device_metadata = {'vendor_id': device.idVendor, 'product_id': device.idProduct}
|
||||
|
||||
@@ -308,3 +333,73 @@ async def open_pyusb_transport(spec: str) -> Transport:
|
||||
packet_sink.start()
|
||||
|
||||
return UsbTransport(device, packet_source, packet_sink)
|
||||
|
||||
|
||||
async def _power_cycle(device: UsbDevice) -> UsbDevice:
|
||||
"""
|
||||
For devices connected to compatible USB hubs: Performs a power cycle on a given USB device.
|
||||
This involves temporarily disabling its port on the hub and then re-enabling it.
|
||||
"""
|
||||
device_path = f'{device.bus}-{".".join(map(str, device.port_numbers))}' # type: ignore
|
||||
hub = _find_hub_by_device_path(device_path)
|
||||
|
||||
if hub:
|
||||
try:
|
||||
device_port = device.port_numbers[-1] # type: ignore
|
||||
_set_port_status(hub, device_port, False)
|
||||
await asyncio.sleep(POWER_CYCLE_DELAY)
|
||||
_set_port_status(hub, device_port, True)
|
||||
await asyncio.sleep(RESET_DELAY)
|
||||
|
||||
# Device needs to be find again otherwise it will appear as disconnected
|
||||
return usb.core.find(idVendor=device.idVendor, idProduct=device.idProduct) # type: ignore
|
||||
except USBError as e:
|
||||
logger.error(f"Adjustment needed: Please revise the udev rule for device {hex(device.idVendor)}:{hex(device.idProduct)} for proper recognition.") # type: ignore
|
||||
logger.error(e)
|
||||
|
||||
return device
|
||||
|
||||
|
||||
def _set_port_status(device: UsbDevice, port: int, on: bool):
|
||||
"""Sets the power status of a specific port on a USB hub."""
|
||||
device.ctrl_transfer(
|
||||
bmRequestType=CTRL_TYPE_CLASS | CTRL_RECIPIENT_OTHER,
|
||||
bRequest=REQ_SET_FEATURE if on else REQ_CLEAR_FEATURE,
|
||||
wIndex=port,
|
||||
wValue=USB_PORT_FEATURE_POWER,
|
||||
)
|
||||
|
||||
|
||||
def _find_device_by_path(sys_path: str) -> Optional[UsbDevice]:
|
||||
"""Finds a USB device based on its system path."""
|
||||
bus_num, *port_parts = sys_path.split('-')
|
||||
ports = [int(port) for port in port_parts[0].split('.')]
|
||||
devices = usb.core.find(find_all=True, bus=int(bus_num))
|
||||
if devices:
|
||||
for device in devices:
|
||||
if device.bus == int(bus_num) and list(device.port_numbers) == ports: # type: ignore
|
||||
return device
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def _find_hub_by_device_path(sys_path: str) -> Optional[UsbDevice]:
|
||||
"""Finds the USB hub associated with a specific device path."""
|
||||
hub_sys_path = sys_path.rsplit('.', 1)[0]
|
||||
hub_device = _find_device_by_path(hub_sys_path)
|
||||
|
||||
if hub_device is None:
|
||||
return None
|
||||
else:
|
||||
return hub_device if _is_hub(hub_device) else None
|
||||
|
||||
|
||||
def _is_hub(device: UsbDevice) -> bool:
|
||||
"""Checks if a USB device is a hub"""
|
||||
if device.bDeviceClass == CLASS_HUB: # type: ignore
|
||||
return True
|
||||
for config in device:
|
||||
for interface in config:
|
||||
if interface.bInterfaceClass == CLASS_HUB: # type: ignore
|
||||
return True
|
||||
return False
|
||||
|
||||
@@ -30,6 +30,7 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
|
||||
# A pass-through function to ease mock testing.
|
||||
async def _create_server(*args, **kw_args):
|
||||
await asyncio.get_running_loop().create_server(*args, **kw_args)
|
||||
|
||||
@@ -61,7 +61,7 @@ async def func4(x, y):
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
print("MAIN: start, loop=", asyncio.get_running_loop())
|
||||
print("MAIN: invoke func1")
|
||||
func1(1, 2)
|
||||
|
||||
@@ -21,23 +21,29 @@ import os
|
||||
import logging
|
||||
from bumble.colors import color
|
||||
from bumble.device import Device
|
||||
from bumble.hci import Address
|
||||
from bumble.transport import open_transport
|
||||
from bumble.profiles.battery_service import BatteryServiceProxy
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) != 3:
|
||||
print('Usage: battery_client.py <transport-spec> <bluetooth-address>')
|
||||
print('example: battery_client.py usb:0 E1:CA:72:48:C4:E8')
|
||||
return
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport(sys.argv[1]) as (hci_source, hci_sink):
|
||||
async with await open_transport(sys.argv[1]) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
# Create and start a device
|
||||
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
|
||||
device = Device.with_hci(
|
||||
'Bumble',
|
||||
Address('F0:F1:F2:F3:F4:F5'),
|
||||
hci_transport.source,
|
||||
hci_transport.sink,
|
||||
)
|
||||
await device.power_on()
|
||||
|
||||
# Connect to the peer
|
||||
|
||||
@@ -29,14 +29,16 @@ from bumble.profiles.battery_service import BatteryService
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) != 3:
|
||||
print('Usage: python battery_server.py <device-config> <transport-spec>')
|
||||
print('example: python battery_server.py device1.json usb:0')
|
||||
return
|
||||
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
|
||||
# Add a Battery Service to the GATT sever
|
||||
battery_service = BatteryService(lambda _: random.randint(0, 100))
|
||||
|
||||
@@ -21,12 +21,13 @@ import os
|
||||
import logging
|
||||
from bumble.colors import color
|
||||
from bumble.device import Device, Peer
|
||||
from bumble.hci import Address
|
||||
from bumble.profiles.device_information_service import DeviceInformationServiceProxy
|
||||
from bumble.transport import open_transport
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) != 3:
|
||||
print(
|
||||
'Usage: device_information_client.py <transport-spec> <bluetooth-address>'
|
||||
@@ -35,11 +36,16 @@ async def main():
|
||||
return
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport(sys.argv[1]) as (hci_source, hci_sink):
|
||||
async with await open_transport(sys.argv[1]) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
# Create and start a device
|
||||
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
|
||||
device = Device.with_hci(
|
||||
'Bumble',
|
||||
Address('F0:F1:F2:F3:F4:F5'),
|
||||
hci_transport.source,
|
||||
hci_transport.sink,
|
||||
)
|
||||
await device.power_on()
|
||||
|
||||
# Connect to the peer
|
||||
|
||||
@@ -28,14 +28,16 @@ from bumble.profiles.device_information_service import DeviceInformationService
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) != 3:
|
||||
print('Usage: python device_info_server.py <device-config> <transport-spec>')
|
||||
print('example: python device_info_server.py device1.json usb:0')
|
||||
return
|
||||
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
|
||||
# Add a Device Information Service to the GATT sever
|
||||
device_information_service = DeviceInformationService(
|
||||
@@ -64,7 +66,7 @@ async def main():
|
||||
# Go!
|
||||
await device.power_on()
|
||||
await device.start_advertising(auto_restart=True)
|
||||
await hci_source.wait_for_termination()
|
||||
await hci_transport.source.wait_for_termination()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -21,23 +21,29 @@ import os
|
||||
import logging
|
||||
from bumble.colors import color
|
||||
from bumble.device import Device
|
||||
from bumble.hci import Address
|
||||
from bumble.transport import open_transport
|
||||
from bumble.profiles.heart_rate_service import HeartRateServiceProxy
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) != 3:
|
||||
print('Usage: heart_rate_client.py <transport-spec> <bluetooth-address>')
|
||||
print('example: heart_rate_client.py usb:0 E1:CA:72:48:C4:E8')
|
||||
return
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport(sys.argv[1]) as (hci_source, hci_sink):
|
||||
async with await open_transport(sys.argv[1]) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
# Create and start a device
|
||||
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
|
||||
device = Device.with_hci(
|
||||
'Bumble',
|
||||
Address('F0:F1:F2:F3:F4:F5'),
|
||||
hci_transport.source,
|
||||
hci_transport.sink,
|
||||
)
|
||||
await device.power_on()
|
||||
|
||||
# Connect to the peer
|
||||
|
||||
@@ -33,14 +33,16 @@ from bumble.utils import AsyncRunner
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) != 3:
|
||||
print('Usage: python heart_rate_server.py <device-config> <transport-spec>')
|
||||
print('example: python heart_rate_server.py device1.json usb:0')
|
||||
return
|
||||
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
|
||||
# Keep track of accumulated expended energy
|
||||
energy_start_time = time.time()
|
||||
|
||||
@@ -416,7 +416,7 @@ async def keyboard_device(device, command):
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) < 4:
|
||||
print(
|
||||
'Usage: python keyboard.py <device-config> <transport-spec> <command>'
|
||||
@@ -434,9 +434,11 @@ async def main():
|
||||
)
|
||||
return
|
||||
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
# Create a device to manage the host
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
|
||||
command = sys.argv[3]
|
||||
if command == 'connect':
|
||||
|
||||
@@ -139,18 +139,20 @@ async def find_a2dp_service(connection):
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) < 4:
|
||||
print('Usage: run_a2dp_info.py <device-config> <transport-spec> <bt-addr>')
|
||||
print('example: run_a2dp_info.py classic1.json usb:0 14:7D:DA:4E:53:A8')
|
||||
return
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
# Create a device
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
device.classic_enabled = True
|
||||
|
||||
# Start the controller
|
||||
@@ -187,7 +189,7 @@ async def main():
|
||||
client = await AVDTP_Protocol.connect(connection, avdtp_version)
|
||||
|
||||
# Discover all endpoints on the remote device
|
||||
endpoints = await client.discover_remote_endpoints()
|
||||
endpoints = list(await client.discover_remote_endpoints())
|
||||
print(f'@@@ Found {len(endpoints)} endpoints')
|
||||
for endpoint in endpoints:
|
||||
print('@@@', endpoint)
|
||||
|
||||
@@ -19,6 +19,7 @@ import asyncio
|
||||
import sys
|
||||
import os
|
||||
import logging
|
||||
from typing import Any, Dict
|
||||
|
||||
from bumble.device import Device
|
||||
from bumble.transport import open_transport_or_link
|
||||
@@ -41,7 +42,7 @@ from bumble.a2dp import (
|
||||
SbcMediaCodecInformation,
|
||||
)
|
||||
|
||||
Context = {'output': None}
|
||||
Context: Dict[Any, Any] = {'output': None}
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -104,7 +105,7 @@ def on_rtp_packet(packet):
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) < 4:
|
||||
print(
|
||||
'Usage: run_a2dp_sink.py <device-config> <transport-spec> <sbc-file> '
|
||||
@@ -114,14 +115,16 @@ async def main():
|
||||
return
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
with open(sys.argv[3], 'wb') as sbc_file:
|
||||
Context['output'] = sbc_file
|
||||
|
||||
# Create a device
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
device.classic_enabled = True
|
||||
|
||||
# Setup the SDP to expose the sink service
|
||||
@@ -162,7 +165,7 @@ async def main():
|
||||
await device.set_discoverable(True)
|
||||
await device.set_connectable(True)
|
||||
|
||||
await hci_source.wait_for_termination()
|
||||
await hci_transport.source.wait_for_termination()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -114,7 +114,7 @@ async def stream_packets(read_function, protocol):
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) < 4:
|
||||
print(
|
||||
'Usage: run_a2dp_source.py <device-config> <transport-spec> <sbc-file> '
|
||||
@@ -126,11 +126,13 @@ async def main():
|
||||
return
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
# Create a device
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
device.classic_enabled = True
|
||||
|
||||
# Setup the SDP to expose the SRC service
|
||||
@@ -186,7 +188,7 @@ async def main():
|
||||
await device.set_discoverable(True)
|
||||
await device.set_connectable(True)
|
||||
|
||||
await hci_source.wait_for_termination()
|
||||
await hci_transport.source.wait_for_termination()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -28,7 +28,7 @@ from bumble.transport import open_transport_or_link
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) < 3:
|
||||
print(
|
||||
'Usage: run_advertiser.py <config-file> <transport-spec> [type] [address]'
|
||||
@@ -50,10 +50,12 @@ async def main():
|
||||
target = None
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
|
||||
if advertising_type.is_scannable:
|
||||
device.scan_response_data = bytes(
|
||||
@@ -66,7 +68,7 @@ async def main():
|
||||
|
||||
await device.power_on()
|
||||
await device.start_advertising(advertising_type=advertising_type, target=target)
|
||||
await hci_source.wait_for_termination()
|
||||
await hci_transport.source.wait_for_termination()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -49,7 +49,7 @@ ASHA_LE_PSM_OUT_CHARACTERISTIC = UUID(
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) != 4:
|
||||
print(
|
||||
'Usage: python run_asha_sink.py <device-config> <transport-spec> '
|
||||
@@ -60,8 +60,10 @@ async def main():
|
||||
|
||||
audio_out = open(sys.argv[3], 'wb')
|
||||
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
|
||||
# Handler for audio control commands
|
||||
def on_audio_control_point_write(_connection, value):
|
||||
@@ -197,7 +199,7 @@ async def main():
|
||||
await device.power_on()
|
||||
await device.start_advertising(auto_restart=True)
|
||||
|
||||
await hci_source.wait_for_termination()
|
||||
await hci_transport.source.wait_for_termination()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -331,7 +331,7 @@ class Delegate(avrcp.Delegate):
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) < 3:
|
||||
print(
|
||||
'Usage: run_avrcp_controller.py <device-config> <transport-spec> '
|
||||
@@ -341,11 +341,13 @@ async def main():
|
||||
return
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
# Create a device
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
device.classic_enabled = True
|
||||
|
||||
# Setup the SDP to expose the sink service
|
||||
|
||||
@@ -32,7 +32,7 @@ from bumble.sdp import (
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) < 3:
|
||||
print(
|
||||
'Usage: run_classic_connect.py <device-config> <transport-spec> '
|
||||
@@ -42,11 +42,13 @@ async def main():
|
||||
return
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
# Create a device
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
device.classic_enabled = True
|
||||
device.le_enabled = False
|
||||
await device.power_on()
|
||||
|
||||
@@ -91,18 +91,20 @@ SDP_SERVICE_RECORDS = {
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) < 3:
|
||||
print('Usage: run_classic_discoverable.py <device-config> <transport-spec>')
|
||||
print('example: run_classic_discoverable.py classic1.json usb:04b4:f901')
|
||||
return
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
# Create a device
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
device.classic_enabled = True
|
||||
device.sdp_service_records = SDP_SERVICE_RECORDS
|
||||
await device.power_on()
|
||||
@@ -111,7 +113,7 @@ async def main():
|
||||
await device.set_discoverable(True)
|
||||
await device.set_connectable(True)
|
||||
|
||||
await hci_source.wait_for_termination()
|
||||
await hci_transport.source.wait_for_termination()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -20,8 +20,8 @@ import sys
|
||||
import os
|
||||
import logging
|
||||
from bumble.colors import color
|
||||
|
||||
from bumble.device import Device
|
||||
from bumble.hci import Address
|
||||
from bumble.transport import open_transport_or_link
|
||||
from bumble.core import DeviceClass
|
||||
|
||||
@@ -53,22 +53,27 @@ class DiscoveryListener(Device.Listener):
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) != 2:
|
||||
print('Usage: run_classic_discovery.py <transport-spec>')
|
||||
print('example: run_classic_discovery.py usb:04b4:f901')
|
||||
return
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[1]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[1]) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
|
||||
device = Device.with_hci(
|
||||
'Bumble',
|
||||
Address('F0:F1:F2:F3:F4:F5'),
|
||||
hci_transport.source,
|
||||
hci_transport.sink,
|
||||
)
|
||||
device.listener = DiscoveryListener()
|
||||
await device.power_on()
|
||||
await device.start_discovery()
|
||||
|
||||
await hci_source.wait_for_termination()
|
||||
await hci_transport.source.wait_for_termination()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -25,7 +25,7 @@ from bumble.transport import open_transport_or_link
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) < 3:
|
||||
print(
|
||||
'Usage: run_connect_and_encrypt.py <device-config> <transport-spec> '
|
||||
@@ -37,11 +37,13 @@ async def main():
|
||||
return
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
# Create a device
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
await device.power_on()
|
||||
|
||||
# Connect to the peer
|
||||
@@ -56,7 +58,7 @@ async def main():
|
||||
print(f'!!! Encryption failed: {error}')
|
||||
return
|
||||
|
||||
await hci_source.wait_for_termination()
|
||||
await hci_transport.source.wait_for_termination()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -36,7 +36,7 @@ from bumble.transport import open_transport_or_link
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) != 4:
|
||||
print(
|
||||
'Usage: run_controller.py <controller-address> <device-config> '
|
||||
@@ -49,7 +49,7 @@ async def main():
|
||||
return
|
||||
|
||||
print('>>> connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[3]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[3]) as hci_transport:
|
||||
print('>>> connected')
|
||||
|
||||
# Create a local link
|
||||
@@ -57,7 +57,10 @@ async def main():
|
||||
|
||||
# Create a first controller using the packet source/sink as its host interface
|
||||
controller1 = Controller(
|
||||
'C1', host_source=hci_source, host_sink=hci_sink, link=link
|
||||
'C1',
|
||||
host_source=hci_transport.source,
|
||||
host_sink=hci_transport.sink,
|
||||
link=link,
|
||||
)
|
||||
controller1.random_address = sys.argv[1]
|
||||
|
||||
@@ -98,7 +101,7 @@ async def main():
|
||||
await device.start_advertising()
|
||||
await device.start_scanning()
|
||||
|
||||
await hci_source.wait_for_termination()
|
||||
await hci_transport.source.wait_for_termination()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -20,9 +20,9 @@ import asyncio
|
||||
import sys
|
||||
import os
|
||||
from bumble.colors import color
|
||||
|
||||
from bumble.device import Device
|
||||
from bumble.controller import Controller
|
||||
from bumble.hci import Address
|
||||
from bumble.link import LocalLink
|
||||
from bumble.transport import open_transport_or_link
|
||||
|
||||
@@ -45,14 +45,14 @@ class ScannerListener(Device.Listener):
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) != 2:
|
||||
print('Usage: run_controller.py <transport-spec>')
|
||||
print('example: run_controller_with_scanner.py serial:/dev/pts/14,1000000')
|
||||
return
|
||||
|
||||
print('>>> connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[1]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[1]) as hci_transport:
|
||||
print('>>> connected')
|
||||
|
||||
# Create a local link
|
||||
@@ -60,22 +60,25 @@ async def main():
|
||||
|
||||
# Create a first controller using the packet source/sink as its host interface
|
||||
controller1 = Controller(
|
||||
'C1', host_source=hci_source, host_sink=hci_sink, link=link
|
||||
'C1',
|
||||
host_source=hci_transport.source,
|
||||
host_sink=hci_transport.sink,
|
||||
link=link,
|
||||
public_address='E0:E1:E2:E3:E4:E5',
|
||||
)
|
||||
controller1.address = 'E0:E1:E2:E3:E4:E5'
|
||||
|
||||
# Create a second controller using the same link
|
||||
controller2 = Controller('C2', link=link)
|
||||
|
||||
# Create a device with a scanner listener
|
||||
device = Device.with_hci(
|
||||
'Bumble', 'F0:F1:F2:F3:F4:F5', controller2, controller2
|
||||
'Bumble', Address('F0:F1:F2:F3:F4:F5'), controller2, controller2
|
||||
)
|
||||
device.listener = ScannerListener()
|
||||
await device.power_on()
|
||||
await device.start_scanning()
|
||||
|
||||
await hci_source.wait_for_termination()
|
||||
await hci_transport.source.wait_for_termination()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -20,31 +20,36 @@ import sys
|
||||
import os
|
||||
import logging
|
||||
from bumble.colors import color
|
||||
|
||||
from bumble.hci import Address
|
||||
from bumble.device import Device
|
||||
from bumble.transport import open_transport_or_link
|
||||
from bumble.snoop import BtSnooper
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) != 3:
|
||||
print('Usage: run_device_with_snooper.py <transport-spec> <snoop-file>')
|
||||
print('example: run_device_with_snooper.py usb:0 btsnoop.log')
|
||||
return
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[1]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[1]) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
|
||||
device = Device.with_hci(
|
||||
'Bumble',
|
||||
Address('F0:F1:F2:F3:F4:F5'),
|
||||
hci_transport.source,
|
||||
hci_transport.sink,
|
||||
)
|
||||
|
||||
with open(sys.argv[2], "wb") as snoop_file:
|
||||
device.host.snooper = BtSnooper(snoop_file)
|
||||
await device.power_on()
|
||||
await device.start_scanning()
|
||||
|
||||
await hci_source.wait_for_termination()
|
||||
await hci_transport.source.wait_for_termination()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -69,7 +69,7 @@ class Listener(Device.Listener):
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) < 3:
|
||||
print(
|
||||
'Usage: run_gatt_client.py <device-config> <transport-spec> '
|
||||
@@ -79,11 +79,13 @@ async def main():
|
||||
return
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
# Create a device to manage the host, with a custom listener
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
device.listener = Listener(device)
|
||||
await device.power_on()
|
||||
|
||||
|
||||
@@ -19,21 +19,21 @@ import asyncio
|
||||
import os
|
||||
import logging
|
||||
from bumble.colors import color
|
||||
|
||||
from bumble.core import ProtocolError
|
||||
from bumble.controller import Controller
|
||||
from bumble.device import Device, Peer
|
||||
from bumble.hci import Address
|
||||
from bumble.host import Host
|
||||
from bumble.link import LocalLink
|
||||
from bumble.gatt import (
|
||||
Service,
|
||||
Characteristic,
|
||||
Descriptor,
|
||||
show_services,
|
||||
GATT_CHARACTERISTIC_USER_DESCRIPTION_DESCRIPTOR,
|
||||
GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
|
||||
GATT_DEVICE_INFORMATION_SERVICE,
|
||||
)
|
||||
from bumble.gatt_client import show_services
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -43,7 +43,7 @@ class ServerListener(Device.Listener):
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
# Create a local link
|
||||
link = LocalLink()
|
||||
|
||||
@@ -51,14 +51,18 @@ async def main():
|
||||
client_controller = Controller("client controller", link=link)
|
||||
client_host = Host()
|
||||
client_host.controller = client_controller
|
||||
client_device = Device("client", address='F0:F1:F2:F3:F4:F5', host=client_host)
|
||||
client_device = Device(
|
||||
"client", address=Address('F0:F1:F2:F3:F4:F5'), host=client_host
|
||||
)
|
||||
await client_device.power_on()
|
||||
|
||||
# Setup a stack for the server
|
||||
server_controller = Controller("server controller", link=link)
|
||||
server_host = Host()
|
||||
server_host.controller = server_controller
|
||||
server_device = Device("server", address='F6:F7:F8:F9:FA:FB', host=server_host)
|
||||
server_device = Device(
|
||||
"server", address=Address('F6:F7:F8:F9:FA:FB'), host=server_host
|
||||
)
|
||||
server_device.listener = ServerListener()
|
||||
await server_device.power_on()
|
||||
|
||||
|
||||
@@ -71,7 +71,7 @@ def my_custom_write_with_error(connection, value):
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) < 3:
|
||||
print(
|
||||
'Usage: run_gatt_server.py <device-config> <transport-spec> '
|
||||
@@ -81,11 +81,13 @@ async def main():
|
||||
return
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
# Create a device to manage the host
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
device.listener = Listener(device)
|
||||
|
||||
# Add a few entries to the device's GATT server
|
||||
@@ -146,7 +148,7 @@ async def main():
|
||||
else:
|
||||
await device.start_advertising(auto_restart=True)
|
||||
|
||||
await hci_source.wait_for_termination()
|
||||
await hci_transport.source.wait_for_termination()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -20,123 +20,48 @@ import sys
|
||||
import os
|
||||
import logging
|
||||
|
||||
from bumble.colors import color
|
||||
|
||||
import bumble.core
|
||||
from bumble.device import Device
|
||||
from bumble.transport import open_transport_or_link
|
||||
from bumble.core import (
|
||||
BT_HANDSFREE_SERVICE,
|
||||
BT_RFCOMM_PROTOCOL_ID,
|
||||
BT_BR_EDR_TRANSPORT,
|
||||
)
|
||||
from bumble import rfcomm, hfp
|
||||
from bumble.hci import HCI_SynchronousDataPacket
|
||||
from bumble.sdp import (
|
||||
Client as SDP_Client,
|
||||
DataElement,
|
||||
ServiceAttribute,
|
||||
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
|
||||
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
|
||||
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
|
||||
)
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# pylint: disable-next=too-many-nested-blocks
|
||||
async def list_rfcomm_channels(device, connection):
|
||||
# Connect to the SDP Server
|
||||
sdp_client = SDP_Client(connection)
|
||||
await sdp_client.connect()
|
||||
|
||||
# Search for services that support the Handsfree Profile
|
||||
search_result = await sdp_client.search_attributes(
|
||||
[BT_HANDSFREE_SERVICE],
|
||||
[
|
||||
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
|
||||
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
|
||||
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
|
||||
def _default_configuration() -> hfp.AgConfiguration:
|
||||
return hfp.AgConfiguration(
|
||||
supported_ag_features=[
|
||||
hfp.AgFeature.HF_INDICATORS,
|
||||
hfp.AgFeature.IN_BAND_RING_TONE_CAPABILITY,
|
||||
hfp.AgFeature.REJECT_CALL,
|
||||
hfp.AgFeature.CODEC_NEGOTIATION,
|
||||
hfp.AgFeature.ESCO_S4_SETTINGS_SUPPORTED,
|
||||
],
|
||||
supported_ag_indicators=[
|
||||
hfp.AgIndicatorState.call(),
|
||||
hfp.AgIndicatorState.service(),
|
||||
hfp.AgIndicatorState.callsetup(),
|
||||
hfp.AgIndicatorState.callsetup(),
|
||||
hfp.AgIndicatorState.signal(),
|
||||
hfp.AgIndicatorState.roam(),
|
||||
hfp.AgIndicatorState.battchg(),
|
||||
],
|
||||
supported_hf_indicators=[
|
||||
hfp.HfIndicator.ENHANCED_SAFETY,
|
||||
hfp.HfIndicator.BATTERY_LEVEL,
|
||||
],
|
||||
supported_ag_call_hold_operations=[],
|
||||
supported_audio_codecs=[hfp.AudioCodec.CVSD, hfp.AudioCodec.MSBC],
|
||||
)
|
||||
print(color('==================================', 'blue'))
|
||||
print(color('Handsfree Services:', 'yellow'))
|
||||
rfcomm_channels = []
|
||||
# pylint: disable-next=too-many-nested-blocks
|
||||
for attribute_list in search_result:
|
||||
# Look for the RFCOMM Channel number
|
||||
protocol_descriptor_list = ServiceAttribute.find_attribute_in_list(
|
||||
attribute_list, SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID
|
||||
)
|
||||
if protocol_descriptor_list:
|
||||
for protocol_descriptor in protocol_descriptor_list.value:
|
||||
if len(protocol_descriptor.value) >= 2:
|
||||
if protocol_descriptor.value[0].value == BT_RFCOMM_PROTOCOL_ID:
|
||||
print(color('SERVICE:', 'green'))
|
||||
print(
|
||||
color(' RFCOMM Channel:', 'cyan'),
|
||||
protocol_descriptor.value[1].value,
|
||||
)
|
||||
rfcomm_channels.append(protocol_descriptor.value[1].value)
|
||||
|
||||
# List profiles
|
||||
bluetooth_profile_descriptor_list = (
|
||||
ServiceAttribute.find_attribute_in_list(
|
||||
attribute_list,
|
||||
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
|
||||
)
|
||||
)
|
||||
if bluetooth_profile_descriptor_list:
|
||||
if bluetooth_profile_descriptor_list.value:
|
||||
if (
|
||||
bluetooth_profile_descriptor_list.value[0].type
|
||||
== DataElement.SEQUENCE
|
||||
):
|
||||
bluetooth_profile_descriptors = (
|
||||
bluetooth_profile_descriptor_list.value
|
||||
)
|
||||
else:
|
||||
# Sometimes, instead of a list of lists, we just
|
||||
# find a list. Fix that
|
||||
bluetooth_profile_descriptors = [
|
||||
bluetooth_profile_descriptor_list
|
||||
]
|
||||
|
||||
print(color(' Profiles:', 'green'))
|
||||
for (
|
||||
bluetooth_profile_descriptor
|
||||
) in bluetooth_profile_descriptors:
|
||||
version_major = (
|
||||
bluetooth_profile_descriptor.value[1].value >> 8
|
||||
)
|
||||
version_minor = (
|
||||
bluetooth_profile_descriptor.value[1].value
|
||||
& 0xFF
|
||||
)
|
||||
print(
|
||||
' '
|
||||
f'{bluetooth_profile_descriptor.value[0].value}'
|
||||
f' - version {version_major}.{version_minor}'
|
||||
)
|
||||
|
||||
# List service classes
|
||||
service_class_id_list = ServiceAttribute.find_attribute_in_list(
|
||||
attribute_list, SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID
|
||||
)
|
||||
if service_class_id_list:
|
||||
if service_class_id_list.value:
|
||||
print(color(' Service Classes:', 'green'))
|
||||
for service_class_id in service_class_id_list.value:
|
||||
print(' ', service_class_id.value)
|
||||
|
||||
await sdp_client.disconnect()
|
||||
return rfcomm_channels
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) < 4:
|
||||
print(
|
||||
'Usage: run_hfp_gateway.py <device-config> <transport-spec> '
|
||||
@@ -149,11 +74,13 @@ async def main():
|
||||
return
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
# Create a device
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
device.classic_enabled = True
|
||||
await device.power_on()
|
||||
|
||||
@@ -164,13 +91,14 @@ async def main():
|
||||
print(f'=== Connected to {connection.peer_address}!')
|
||||
|
||||
# Get a list of all the Handsfree services (should only be 1)
|
||||
channels = await list_rfcomm_channels(device, connection)
|
||||
if len(channels) == 0:
|
||||
if not (hfp_record := await hfp.find_hf_sdp_record(connection)):
|
||||
print('!!! no service found')
|
||||
return
|
||||
|
||||
# Pick the first one
|
||||
channel = channels[0]
|
||||
channel, version, hf_sdp_features = hfp_record
|
||||
print(f'HF version: {version}')
|
||||
print(f'HF features: {hf_sdp_features}')
|
||||
|
||||
# Request authentication
|
||||
print('*** Authenticating...')
|
||||
@@ -205,51 +133,9 @@ async def main():
|
||||
|
||||
device.host.on('sco_packet', on_sco)
|
||||
|
||||
# Protocol loop (just for testing at this point)
|
||||
protocol = hfp.HfpProtocol(session)
|
||||
while True:
|
||||
line = await protocol.next_line()
|
||||
ag_protocol = hfp.AgProtocol(session, _default_configuration())
|
||||
|
||||
if line.startswith('AT+BRSF='):
|
||||
protocol.send_response_line('+BRSF: 30')
|
||||
protocol.send_response_line('OK')
|
||||
elif line.startswith('AT+CIND=?'):
|
||||
protocol.send_response_line(
|
||||
'+CIND: ("call",(0,1)),("callsetup",(0-3)),("service",(0-1)),'
|
||||
'("signal",(0-5)),("roam",(0,1)),("battchg",(0-5)),'
|
||||
'("callheld",(0-2))'
|
||||
)
|
||||
protocol.send_response_line('OK')
|
||||
elif line.startswith('AT+CIND?'):
|
||||
protocol.send_response_line('+CIND: 0,0,1,4,1,5,0')
|
||||
protocol.send_response_line('OK')
|
||||
elif line.startswith('AT+CMER='):
|
||||
protocol.send_response_line('OK')
|
||||
elif line.startswith('AT+CHLD=?'):
|
||||
protocol.send_response_line('+CHLD: 0')
|
||||
protocol.send_response_line('OK')
|
||||
elif line.startswith('AT+BTRH?'):
|
||||
protocol.send_response_line('+BTRH: 0')
|
||||
protocol.send_response_line('OK')
|
||||
elif line.startswith('AT+CLIP='):
|
||||
protocol.send_response_line('OK')
|
||||
elif line.startswith('AT+VGS='):
|
||||
protocol.send_response_line('OK')
|
||||
elif line.startswith('AT+BIA='):
|
||||
protocol.send_response_line('OK')
|
||||
elif line.startswith('AT+BVRA='):
|
||||
protocol.send_response_line(
|
||||
'+BVRA: 1,1,12AA,1,1,"Message 1 from Janina"'
|
||||
)
|
||||
elif line.startswith('AT+XEVENT='):
|
||||
protocol.send_response_line('OK')
|
||||
elif line.startswith('AT+XAPL='):
|
||||
protocol.send_response_line('OK')
|
||||
else:
|
||||
print(color('UNSUPPORTED AT COMMAND', 'red'))
|
||||
protocol.send_response_line('ERROR')
|
||||
|
||||
await hci_source.wait_for_termination()
|
||||
await hci_transport.source.terminated
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -37,7 +37,7 @@ hf_protocol: Optional[HfProtocol] = None
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def on_dlc(dlc: rfcomm.DLC, configuration: hfp.Configuration):
|
||||
def on_dlc(dlc: rfcomm.DLC, configuration: hfp.HfConfiguration):
|
||||
print('*** DLC connected', dlc)
|
||||
global hf_protocol
|
||||
hf_protocol = HfProtocol(dlc, configuration)
|
||||
@@ -84,19 +84,19 @@ def on_dlc(dlc: rfcomm.DLC, configuration: hfp.Configuration):
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) < 3:
|
||||
print('Usage: run_classic_hfp.py <device-config> <transport-spec>')
|
||||
print('example: run_classic_hfp.py classic2.json usb:04b4:f901')
|
||||
return
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
# Hands-Free profile configuration.
|
||||
# TODO: load configuration from file.
|
||||
configuration = hfp.Configuration(
|
||||
configuration = hfp.HfConfiguration(
|
||||
supported_hf_features=[
|
||||
hfp.HfFeature.THREE_WAY_CALLING,
|
||||
hfp.HfFeature.REMOTE_VOLUME_CONTROL,
|
||||
@@ -116,7 +116,9 @@ async def main():
|
||||
)
|
||||
|
||||
# Create a device
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
device.classic_enabled = True
|
||||
|
||||
# Create and register a server
|
||||
@@ -128,7 +130,9 @@ async def main():
|
||||
|
||||
# Advertise the HFP RFComm channel in the SDP
|
||||
device.sdp_service_records = {
|
||||
0x00010001: hfp.sdp_records(0x00010001, channel_number, configuration)
|
||||
0x00010001: hfp.make_hf_sdp_records(
|
||||
0x00010001, channel_number, configuration
|
||||
)
|
||||
}
|
||||
|
||||
# Let's go!
|
||||
@@ -164,7 +168,7 @@ async def main():
|
||||
|
||||
await websockets.serve(serve, 'localhost', 8989)
|
||||
|
||||
await hci_source.wait_for_termination()
|
||||
await hci_transport.source.wait_for_termination()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -489,7 +489,7 @@ async def keyboard_device(hid_device):
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) < 3:
|
||||
print(
|
||||
'Usage: python run_hid_device.py <device-config> <transport-spec> <command>'
|
||||
@@ -601,11 +601,13 @@ async def main():
|
||||
asyncio.create_task(handle_virtual_cable_unplug())
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
# Create a device
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
device.classic_enabled = True
|
||||
|
||||
# Create and register HID device
|
||||
@@ -742,7 +744,7 @@ async def main():
|
||||
print("Executing in Web mode")
|
||||
await keyboard_device(hid_device)
|
||||
|
||||
await hci_source.wait_for_termination()
|
||||
await hci_transport.source.wait_for_termination()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -275,7 +275,7 @@ async def get_stream_reader(pipe) -> asyncio.StreamReader:
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) < 4:
|
||||
print(
|
||||
'Usage: run_hid_host.py <device-config> <transport-spec> '
|
||||
@@ -324,11 +324,13 @@ async def main():
|
||||
asyncio.create_task(handle_virtual_cable_unplug())
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
print('<<< CONNECTED')
|
||||
|
||||
# Create a device
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
device.classic_enabled = True
|
||||
|
||||
# Create HID host and start it
|
||||
@@ -557,7 +559,7 @@ async def main():
|
||||
# Interrupt Channel
|
||||
await hid_host.connect_interrupt_channel()
|
||||
|
||||
await hci_source.wait_for_termination()
|
||||
await hci_transport.source.wait_for_termination()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -57,18 +57,20 @@ def on_my_characteristic_subscription(peer, enabled):
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) < 3:
|
||||
print('Usage: run_notifier.py <device-config> <transport-spec>')
|
||||
print('example: run_notifier.py device1.json usb:0')
|
||||
return
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
# Create a device to manage the host
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
device.listener = Listener(device)
|
||||
|
||||
# Add a few entries to the device's GATT server
|
||||
|
||||
@@ -165,7 +165,7 @@ async def tcp_server(tcp_port, rfcomm_session):
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) < 5:
|
||||
print(
|
||||
'Usage: run_rfcomm_client.py <device-config> <transport-spec> '
|
||||
@@ -178,11 +178,13 @@ async def main():
|
||||
return
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
# Create a device
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
device.classic_enabled = True
|
||||
await device.power_on()
|
||||
|
||||
@@ -192,8 +194,8 @@ async def main():
|
||||
connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT)
|
||||
print(f'=== Connected to {connection.peer_address}!')
|
||||
|
||||
channel = sys.argv[4]
|
||||
if channel == 'discover':
|
||||
channel_str = sys.argv[4]
|
||||
if channel_str == 'discover':
|
||||
await list_rfcomm_channels(connection)
|
||||
return
|
||||
|
||||
@@ -213,7 +215,7 @@ async def main():
|
||||
rfcomm_mux = await rfcomm_client.start()
|
||||
print('@@@ Started')
|
||||
|
||||
channel = int(channel)
|
||||
channel = int(channel_str)
|
||||
print(f'### Opening session for channel {channel}...')
|
||||
try:
|
||||
session = await rfcomm_mux.open_dlc(channel)
|
||||
@@ -229,7 +231,7 @@ async def main():
|
||||
tcp_port = int(sys.argv[5])
|
||||
asyncio.create_task(tcp_server(tcp_port, session))
|
||||
|
||||
await hci_source.wait_for_termination()
|
||||
await hci_transport.source.wait_for_termination()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -107,7 +107,7 @@ class TcpServer:
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) < 4:
|
||||
print(
|
||||
'Usage: run_rfcomm_server.py <device-config> <transport-spec> '
|
||||
@@ -124,11 +124,13 @@ async def main():
|
||||
uuid = 'E6D55659-C8B4-4B85-96BB-B1143AF6D3AE'
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
# Create a device
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
device.classic_enabled = True
|
||||
|
||||
# Create a TCP server
|
||||
@@ -153,7 +155,7 @@ async def main():
|
||||
await device.set_discoverable(True)
|
||||
await device.set_connectable(True)
|
||||
|
||||
await hci_source.wait_for_termination()
|
||||
await hci_transport.source.wait_for_termination()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -20,27 +20,31 @@ import sys
|
||||
import os
|
||||
import logging
|
||||
from bumble.colors import color
|
||||
|
||||
from bumble.hci import Address
|
||||
from bumble.device import Device
|
||||
from bumble.transport import open_transport_or_link
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) < 2:
|
||||
print('Usage: run_scanner.py <transport-spec> [filter]')
|
||||
print('example: run_scanner.py usb:0')
|
||||
return
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[1]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[1]) as hci_transport:
|
||||
print('<<< connected')
|
||||
filter_duplicates = len(sys.argv) == 3 and sys.argv[2] == 'filter'
|
||||
|
||||
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
|
||||
device = Device.with_hci(
|
||||
'Bumble',
|
||||
Address('F0:F1:F2:F3:F4:F5'),
|
||||
hci_transport.source,
|
||||
hci_transport.sink,
|
||||
)
|
||||
|
||||
@device.on('advertisement')
|
||||
def _(advertisement):
|
||||
def on_adv(advertisement):
|
||||
address_type_string = ('PUBLIC', 'RANDOM', 'PUBLIC_ID', 'RANDOM_ID')[
|
||||
advertisement.address.address_type
|
||||
]
|
||||
@@ -67,10 +71,11 @@ async def main():
|
||||
f'{advertisement.data.to_string(separator)}'
|
||||
)
|
||||
|
||||
device.on('advertisement', on_adv)
|
||||
await device.power_on()
|
||||
await device.start_scanning(filter_duplicates=filter_duplicates)
|
||||
|
||||
await hci_source.wait_for_termination()
|
||||
await hci_transport.source.wait_for_termination()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -33,7 +33,6 @@ include_package_data = True
|
||||
install_requires =
|
||||
aiohttp ~= 3.8; platform_system!='Emscripten'
|
||||
appdirs >= 1.4; platform_system!='Emscripten'
|
||||
bt-test-interfaces >= 0.0.6; platform_system!='Emscripten'
|
||||
click >= 8.1.3; platform_system!='Emscripten'
|
||||
cryptography == 39; platform_system!='Emscripten'
|
||||
# Pyodide bundles a version of cryptography that is built for wasm, which may not match the
|
||||
@@ -83,7 +82,7 @@ build =
|
||||
build >= 0.7
|
||||
test =
|
||||
pytest >= 8.0
|
||||
pytest-asyncio >= 0.21.1
|
||||
pytest-asyncio >= 0.23.5
|
||||
pytest-html >= 3.2.0
|
||||
coverage >= 6.4
|
||||
development =
|
||||
@@ -100,6 +99,8 @@ development =
|
||||
avatar =
|
||||
pandora-avatar == 0.0.9
|
||||
rootcanal == 1.10.0 ; python_version>='3.10'
|
||||
pandora =
|
||||
bt-test-interfaces >= 0.0.6
|
||||
documentation =
|
||||
mkdocs >= 1.4.0
|
||||
mkdocs-material >= 8.5.6
|
||||
|
||||
@@ -19,8 +19,9 @@ import asyncio
|
||||
import logging
|
||||
import os
|
||||
import pytest
|
||||
import pytest_asyncio
|
||||
|
||||
from typing import Tuple
|
||||
from typing import Tuple, Optional
|
||||
|
||||
from .test_utils import TwoDevices
|
||||
from bumble import core
|
||||
@@ -35,10 +36,94 @@ from bumble import hci
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def _default_hf_configuration() -> hfp.HfConfiguration:
|
||||
return hfp.HfConfiguration(
|
||||
supported_hf_features=[
|
||||
hfp.HfFeature.CODEC_NEGOTIATION,
|
||||
hfp.HfFeature.ESCO_S4_SETTINGS_SUPPORTED,
|
||||
hfp.HfFeature.HF_INDICATORS,
|
||||
hfp.HfFeature.ENHANCED_CALL_STATUS,
|
||||
hfp.HfFeature.THREE_WAY_CALLING,
|
||||
hfp.HfFeature.CLI_PRESENTATION_CAPABILITY,
|
||||
],
|
||||
supported_hf_indicators=[
|
||||
hfp.HfIndicator.ENHANCED_SAFETY,
|
||||
hfp.HfIndicator.BATTERY_LEVEL,
|
||||
],
|
||||
supported_audio_codecs=[
|
||||
hfp.AudioCodec.CVSD,
|
||||
hfp.AudioCodec.MSBC,
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def _default_hf_sdp_features() -> hfp.HfSdpFeature:
|
||||
return (
|
||||
hfp.HfSdpFeature.WIDE_BAND
|
||||
| hfp.HfSdpFeature.THREE_WAY_CALLING
|
||||
| hfp.HfSdpFeature.CLI_PRESENTATION_CAPABILITY
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def _default_ag_configuration() -> hfp.AgConfiguration:
|
||||
return hfp.AgConfiguration(
|
||||
supported_ag_features=[
|
||||
hfp.AgFeature.HF_INDICATORS,
|
||||
hfp.AgFeature.IN_BAND_RING_TONE_CAPABILITY,
|
||||
hfp.AgFeature.REJECT_CALL,
|
||||
hfp.AgFeature.CODEC_NEGOTIATION,
|
||||
hfp.AgFeature.ESCO_S4_SETTINGS_SUPPORTED,
|
||||
hfp.AgFeature.ENHANCED_CALL_STATUS,
|
||||
hfp.AgFeature.THREE_WAY_CALLING,
|
||||
],
|
||||
supported_ag_indicators=[
|
||||
hfp.AgIndicatorState.call(),
|
||||
hfp.AgIndicatorState.service(),
|
||||
hfp.AgIndicatorState.callsetup(),
|
||||
hfp.AgIndicatorState.callsetup(),
|
||||
hfp.AgIndicatorState.signal(),
|
||||
hfp.AgIndicatorState.roam(),
|
||||
hfp.AgIndicatorState.battchg(),
|
||||
],
|
||||
supported_hf_indicators=[
|
||||
hfp.HfIndicator.ENHANCED_SAFETY,
|
||||
hfp.HfIndicator.BATTERY_LEVEL,
|
||||
],
|
||||
supported_ag_call_hold_operations=[
|
||||
hfp.CallHoldOperation.ADD_HELD_CALL,
|
||||
hfp.CallHoldOperation.HOLD_ALL_ACTIVE_CALLS,
|
||||
hfp.CallHoldOperation.HOLD_ALL_CALLS_EXCEPT,
|
||||
hfp.CallHoldOperation.RELEASE_ALL_ACTIVE_CALLS,
|
||||
hfp.CallHoldOperation.RELEASE_ALL_HELD_CALLS,
|
||||
hfp.CallHoldOperation.RELEASE_SPECIFIC_CALL,
|
||||
hfp.CallHoldOperation.CONNECT_TWO_CALLS,
|
||||
],
|
||||
supported_audio_codecs=[hfp.AudioCodec.CVSD, hfp.AudioCodec.MSBC],
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def _default_ag_sdp_features() -> hfp.AgSdpFeature:
|
||||
return (
|
||||
hfp.AgSdpFeature.WIDE_BAND
|
||||
| hfp.AgSdpFeature.IN_BAND_RING_TONE_CAPABILITY
|
||||
| hfp.AgSdpFeature.THREE_WAY_CALLING
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def make_hfp_connections(
|
||||
hf_config: hfp.Configuration,
|
||||
) -> Tuple[hfp.HfProtocol, hfp.HfpProtocol]:
|
||||
hf_config: Optional[hfp.HfConfiguration] = None,
|
||||
ag_config: Optional[hfp.AgConfiguration] = None,
|
||||
):
|
||||
if not hf_config:
|
||||
hf_config = _default_hf_configuration()
|
||||
if not ag_config:
|
||||
ag_config = _default_ag_configuration()
|
||||
|
||||
# Setup devices
|
||||
devices = TwoDevices()
|
||||
await devices.setup_connection()
|
||||
@@ -55,38 +140,371 @@ async def make_hfp_connections(
|
||||
|
||||
# Setup HFP connection
|
||||
hf = hfp.HfProtocol(client_dlc, hf_config)
|
||||
ag = hfp.HfpProtocol(server_dlc)
|
||||
return hf, ag
|
||||
ag = hfp.AgProtocol(server_dlc, ag_config)
|
||||
|
||||
await hf.initiate_slc()
|
||||
return (hf, ag)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest_asyncio.fixture
|
||||
async def hfp_connections():
|
||||
hf, ag = await make_hfp_connections()
|
||||
hf_loop_task = asyncio.create_task(hf.run())
|
||||
|
||||
try:
|
||||
yield (hf, ag)
|
||||
finally:
|
||||
# Close the coroutine.
|
||||
hf.unsolicited_queue.put_nowait(None)
|
||||
await hf_loop_task
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_slc():
|
||||
hf_config = hfp.Configuration(
|
||||
supported_hf_features=[], supported_hf_indicators=[], supported_audio_codecs=[]
|
||||
)
|
||||
hf, ag = await make_hfp_connections(hf_config)
|
||||
|
||||
async def ag_loop():
|
||||
while line := await ag.next_line():
|
||||
if line.startswith('AT+BRSF'):
|
||||
ag.send_response_line('+BRSF: 0')
|
||||
elif line.startswith('AT+CIND=?'):
|
||||
ag.send_response_line(
|
||||
'+CIND: ("call",(0,1)),("callsetup",(0-3)),("service",(0-1)),'
|
||||
'("signal",(0-5)),("roam",(0,1)),("battchg",(0-5)),'
|
||||
'("callheld",(0-2))'
|
||||
async def test_slc_with_minimal_features():
|
||||
hf, ag = await make_hfp_connections(
|
||||
hfp.HfConfiguration(
|
||||
supported_audio_codecs=[],
|
||||
supported_hf_features=[],
|
||||
supported_hf_indicators=[],
|
||||
),
|
||||
hfp.AgConfiguration(
|
||||
supported_ag_call_hold_operations=[],
|
||||
supported_ag_features=[],
|
||||
supported_ag_indicators=[
|
||||
hfp.AgIndicatorState(
|
||||
indicator=hfp.AgIndicator.CALL,
|
||||
supported_values={0, 1},
|
||||
current_status=0,
|
||||
)
|
||||
elif line.startswith('AT+CIND?'):
|
||||
ag.send_response_line('+CIND: 0,0,1,4,1,5,0')
|
||||
ag.send_response_line('OK')
|
||||
],
|
||||
supported_hf_indicators=[],
|
||||
supported_audio_codecs=[],
|
||||
),
|
||||
)
|
||||
|
||||
ag_task = asyncio.create_task(ag_loop())
|
||||
assert hf.supported_ag_features == ag.supported_ag_features
|
||||
assert hf.supported_hf_features == ag.supported_hf_features
|
||||
assert hf.supported_ag_call_hold_operations == ag.supported_ag_call_hold_operations
|
||||
for a, b in zip(hf.ag_indicators, ag.ag_indicators):
|
||||
assert a.indicator == b.indicator
|
||||
assert a.current_status == b.current_status
|
||||
|
||||
await hf.initiate_slc()
|
||||
ag_task.cancel()
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_slc(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
|
||||
hf, ag = hfp_connections
|
||||
|
||||
assert hf.supported_ag_features == ag.supported_ag_features
|
||||
assert hf.supported_hf_features == ag.supported_hf_features
|
||||
assert hf.supported_ag_call_hold_operations == ag.supported_ag_call_hold_operations
|
||||
for a, b in zip(hf.ag_indicators, ag.ag_indicators):
|
||||
assert a.indicator == b.indicator
|
||||
assert a.current_status == b.current_status
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_ag_indicator(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
|
||||
hf, ag = hfp_connections
|
||||
|
||||
future = asyncio.get_running_loop().create_future()
|
||||
hf.on('ag_indicator', future.set_result)
|
||||
|
||||
ag.update_ag_indicator(hfp.AgIndicator.CALL, 1)
|
||||
|
||||
indicator: hfp.AgIndicatorState = await future
|
||||
assert indicator.current_status == 1
|
||||
assert indicator.indicator == hfp.AgIndicator.CALL
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_hf_indicator(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
|
||||
hf, ag = hfp_connections
|
||||
|
||||
future = asyncio.get_running_loop().create_future()
|
||||
ag.on('hf_indicator', future.set_result)
|
||||
|
||||
await hf.execute_command('AT+BIEV=2,100')
|
||||
|
||||
indicator: hfp.HfIndicatorState = await future
|
||||
assert indicator.current_status == 100
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_codec_negotiation(
|
||||
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
|
||||
):
|
||||
hf, ag = hfp_connections
|
||||
|
||||
futures = [
|
||||
asyncio.get_running_loop().create_future(),
|
||||
asyncio.get_running_loop().create_future(),
|
||||
]
|
||||
hf.on('codec_negotiation', futures[0].set_result)
|
||||
ag.on('codec_negotiation', futures[1].set_result)
|
||||
await ag.negotiate_codec(hfp.AudioCodec.MSBC)
|
||||
|
||||
assert await futures[0] == await futures[1]
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_dial(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
|
||||
hf, ag = hfp_connections
|
||||
NUMBER = 'ATD123456789'
|
||||
|
||||
future = asyncio.get_running_loop().create_future()
|
||||
ag.on('dial', future.set_result)
|
||||
await hf.execute_command(f'ATD{NUMBER}')
|
||||
|
||||
number: str = await future
|
||||
assert number == NUMBER
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_answer(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
|
||||
hf, ag = hfp_connections
|
||||
|
||||
future = asyncio.get_running_loop().create_future()
|
||||
ag.on('answer', lambda: future.set_result(None))
|
||||
await hf.answer_incoming_call()
|
||||
|
||||
await future
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_reject_incoming_call(
|
||||
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
|
||||
):
|
||||
hf, ag = hfp_connections
|
||||
|
||||
future = asyncio.get_running_loop().create_future()
|
||||
ag.on('hang_up', lambda: future.set_result(None))
|
||||
await hf.reject_incoming_call()
|
||||
|
||||
await future
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_terminate_call(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
|
||||
hf, ag = hfp_connections
|
||||
|
||||
future = asyncio.get_running_loop().create_future()
|
||||
ag.on('hang_up', lambda: future.set_result(None))
|
||||
await hf.terminate_call()
|
||||
|
||||
await future
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_query_calls_without_calls(
|
||||
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
|
||||
):
|
||||
hf, ag = hfp_connections
|
||||
|
||||
await hf.query_current_calls() == []
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_query_calls_with_calls(
|
||||
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
|
||||
):
|
||||
hf, ag = hfp_connections
|
||||
ag.calls.append(
|
||||
hfp.CallInfo(
|
||||
index=1,
|
||||
direction=hfp.CallInfoDirection.MOBILE_ORIGINATED_CALL,
|
||||
status=hfp.CallInfoStatus.ACTIVE,
|
||||
mode=hfp.CallInfoMode.VOICE,
|
||||
multi_party=hfp.CallInfoMultiParty.NOT_IN_CONFERENCE,
|
||||
number='123456789',
|
||||
)
|
||||
)
|
||||
|
||||
await hf.query_current_calls() == ag.calls
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize(
|
||||
"operation,",
|
||||
(
|
||||
hfp.CallHoldOperation.RELEASE_ALL_HELD_CALLS,
|
||||
hfp.CallHoldOperation.RELEASE_ALL_ACTIVE_CALLS,
|
||||
hfp.CallHoldOperation.HOLD_ALL_ACTIVE_CALLS,
|
||||
hfp.CallHoldOperation.ADD_HELD_CALL,
|
||||
hfp.CallHoldOperation.CONNECT_TWO_CALLS,
|
||||
),
|
||||
)
|
||||
async def test_hold_call_without_call_index(
|
||||
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol],
|
||||
operation: hfp.CallHoldOperation,
|
||||
):
|
||||
hf, ag = hfp_connections
|
||||
call_hold_future = asyncio.get_running_loop().create_future()
|
||||
ag.on("call_hold", lambda op, index: call_hold_future.set_result((op, index)))
|
||||
|
||||
await hf.execute_command(f"AT+CHLD={operation.value}")
|
||||
|
||||
assert (await call_hold_future) == (operation, None)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize(
|
||||
"operation,",
|
||||
(
|
||||
hfp.CallHoldOperation.RELEASE_SPECIFIC_CALL,
|
||||
hfp.CallHoldOperation.HOLD_ALL_CALLS_EXCEPT,
|
||||
),
|
||||
)
|
||||
async def test_hold_call_with_call_index(
|
||||
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol],
|
||||
operation: hfp.CallHoldOperation,
|
||||
):
|
||||
hf, ag = hfp_connections
|
||||
call_hold_future = asyncio.get_running_loop().create_future()
|
||||
ag.on("call_hold", lambda op, index: call_hold_future.set_result((op, index)))
|
||||
ag.calls.append(
|
||||
hfp.CallInfo(
|
||||
index=1,
|
||||
direction=hfp.CallInfoDirection.MOBILE_ORIGINATED_CALL,
|
||||
status=hfp.CallInfoStatus.ACTIVE,
|
||||
mode=hfp.CallInfoMode.VOICE,
|
||||
multi_party=hfp.CallInfoMultiParty.NOT_IN_CONFERENCE,
|
||||
number='123456789',
|
||||
)
|
||||
)
|
||||
|
||||
await hf.execute_command(f"AT+CHLD={operation.value.replace('x', '1')}")
|
||||
|
||||
assert (await call_hold_future) == (operation, 1)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_ring(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
|
||||
hf, ag = hfp_connections
|
||||
ring_future = asyncio.get_running_loop().create_future()
|
||||
hf.on("ring", lambda: ring_future.set_result(None))
|
||||
|
||||
ag.send_ring()
|
||||
|
||||
await ring_future
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_speaker_volume(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
|
||||
hf, ag = hfp_connections
|
||||
speaker_volume_future = asyncio.get_running_loop().create_future()
|
||||
hf.on("speaker_volume", speaker_volume_future.set_result)
|
||||
|
||||
ag.set_speaker_volume(10)
|
||||
|
||||
assert await speaker_volume_future == 10
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_microphone_volume(
|
||||
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
|
||||
):
|
||||
hf, ag = hfp_connections
|
||||
microphone_volume_future = asyncio.get_running_loop().create_future()
|
||||
hf.on("microphone_volume", microphone_volume_future.set_result)
|
||||
|
||||
ag.set_microphone_volume(10)
|
||||
|
||||
assert await microphone_volume_future == 10
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_cli_notification(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
|
||||
hf, ag = hfp_connections
|
||||
cli_notification_future = asyncio.get_running_loop().create_future()
|
||||
hf.on("cli_notification", cli_notification_future.set_result)
|
||||
|
||||
ag.send_cli_notification(
|
||||
hfp.CallLineIdentification(number="\"123456789\"", type=129, alpha="\"Bumble\"")
|
||||
)
|
||||
|
||||
assert await cli_notification_future == hfp.CallLineIdentification(
|
||||
number="123456789", type=129, alpha="Bumble", subaddr="", satype=None
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_voice_recognition_from_hf(
|
||||
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
|
||||
):
|
||||
hf, ag = hfp_connections
|
||||
voice_recognition_future = asyncio.get_running_loop().create_future()
|
||||
ag.on("voice_recognition", voice_recognition_future.set_result)
|
||||
|
||||
await hf.execute_command("AT+BVRA=1")
|
||||
|
||||
assert await voice_recognition_future == hfp.VoiceRecognitionState.ENABLE
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_voice_recognition_from_ag(
|
||||
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
|
||||
):
|
||||
hf, ag = hfp_connections
|
||||
voice_recognition_future = asyncio.get_running_loop().create_future()
|
||||
hf.on("voice_recognition", voice_recognition_future.set_result)
|
||||
|
||||
ag.send_response("+BVRA: 1")
|
||||
|
||||
assert await voice_recognition_future == hfp.VoiceRecognitionState.ENABLE
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_hf_sdp_record():
|
||||
devices = TwoDevices()
|
||||
await devices.setup_connection()
|
||||
|
||||
devices[0].sdp_service_records[1] = hfp.make_hf_sdp_records(
|
||||
1, 2, _default_hf_configuration(), hfp.ProfileVersion.V1_8
|
||||
)
|
||||
|
||||
assert await hfp.find_hf_sdp_record(devices.connections[1]) == (
|
||||
2,
|
||||
hfp.ProfileVersion.V1_8,
|
||||
_default_hf_sdp_features(),
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_ag_sdp_record():
|
||||
devices = TwoDevices()
|
||||
await devices.setup_connection()
|
||||
|
||||
devices[0].sdp_service_records[1] = hfp.make_ag_sdp_records(
|
||||
1, 2, _default_ag_configuration(), hfp.ProfileVersion.V1_8
|
||||
)
|
||||
|
||||
assert await hfp.find_ag_sdp_record(devices.connections[1]) == (
|
||||
2,
|
||||
hfp.ProfileVersion.V1_8,
|
||||
_default_ag_sdp_features(),
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
Reference in New Issue
Block a user