Compare commits

...

30 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod
12af7a526c fix incorrect var reference 2024-05-12 11:59:05 -07:00
zxzxwu
8781943646 Merge pull request #483 from zxzxwu/rfc
RFCOMM: Handle packets received before DLC sink set
2024-05-10 16:34:57 +08:00
Gilles Boccon-Gibod
7fbfdb634c Merge pull request #481 from google/gbg/command-status-fix
allow checking results for HCI_Command_Status_Event
2024-05-09 19:50:10 -07:00
Josh Wu
9682077f6b RFCOMM: Avoid receive packets before DLC sink set 2024-05-09 17:57:13 +08:00
Gilles Boccon-Gibod
22eb405fde Merge pull request #482 from servusdei2018/main
bumble.js(PacketSink): Implement asynchronous packet processing
2024-05-08 20:16:04 -07:00
zxzxwu
593c61973f Merge pull request #480 from zxzxwu/hfp-ag
HFP: Add AG example and fix errors
2024-05-07 17:50:01 +08:00
Josh Wu
ccff32102f HFP: Add example and fix AG errors 2024-05-07 00:36:52 +08:00
Nate
851d62c6c9 bumble.js(PacketSink): Implement asynchronous packet processing 2024-05-05 15:03:22 -04:00
Gilles Boccon-Gibod
090158820f allow checking results for HCI_Command_Status_Event 2024-05-04 12:17:05 -07:00
zxzxwu
26e6650038 Merge pull request #477 from zxzxwu/hfp-ag
Fix HFP query call status
2024-05-02 01:17:17 +08:00
Josh Wu
c48568aabe Fix HFP query call status 2024-04-30 03:13:38 +00:00
zxzxwu
1b33c9eb74 Merge pull request #475 from zxzxwu/hfp-ag
Add more HFP command suppport
2024-04-26 12:01:20 +08:00
zxzxwu
6633228975 Add more HFP command suppport
* Support all Call Hold Operation
* Support CLI Presentation
* Support Voice Recognition
* Support RING and Volume Changes
* [AG] Support Enhanced Call Status
* Minor fixes
2024-04-24 15:29:48 +00:00
Gilles Boccon-Gibod
e9cba788a4 Merge pull request #473 from google/barbibulle-patch-2
quick fix: revert to protobuf 3.12.4
2024-04-22 11:46:04 +02:00
Gilles Boccon-Gibod
98822cfc6b quick fix: revert to protobuf 3.12.4
The upgrade to 4.x wasn't really needed, and breaks some users.
2024-04-18 21:20:18 -07:00
Gilles Boccon-Gibod
97ad7e5741 Merge pull request #472 from google/gbg/update-pandora-deps
update protobuf dep and make pandora install optional
2024-04-18 11:21:29 -07:00
Charlie Boutier
71df062e07 pyusb: power_cycle if '!' is present at the start of the transport 2024-04-17 14:12:55 -07:00
Charlie Boutier
049f9021e9 pyusb: powercycle the dongle 2024-04-17 14:12:55 -07:00
Gilles Boccon-Gibod
50eae2ef54 add pandora to code-check action 2024-04-17 13:19:07 -07:00
Gilles Boccon-Gibod
c8883a7d0f update protobuf dep and make pandora install optional 2024-04-17 13:14:21 -07:00
zxzxwu
51321caf5b Merge pull request #470 from zxzxwu/examples
Type hint all examples
2024-04-16 02:56:08 +08:00
zxzxwu
51a94288e2 Type hint all examples 2024-04-15 12:48:21 +00:00
zxzxwu
8758856e8c Merge pull request #465 from zxzxwu/hfp-ag
HFP AG implementation
2024-04-12 22:15:25 +08:00
Josh Wu
deba181857 HFP AG implementation 2024-04-10 09:51:37 +00:00
zxzxwu
c65188dcbf Merge pull request #466 from zxzxwu/format
Fix format presubmit error
2024-04-09 02:59:36 +08:00
Josh Wu
21d607898d Fix format presubmit error 2024-04-09 01:44:04 +08:00
Gilles Boccon-Gibod
2698d4534e Merge pull request #435 from jeru/main
open_tcp_server_transport: allow explicit sock as input.
2024-04-04 19:17:07 -07:00
zxzxwu
bbcd64286a Merge pull request #463 from zxzxwu/hfp
Correct HFP AG indicator index
2024-04-04 12:53:19 +08:00
Josh Wu
dc1204531e Correct HFP AG indicator index 2024-04-03 17:58:04 +08:00
Cheng Sheng
1ceeccbbc0 open_tcp_server_transport: allow explicit sock as input.
When a user doesn't need an exact port, but cares more about getting
SOME unused port, they can do:
* Create a socket outside with port=None or port=0.
* Use socket.getsockname()[1] to get the allocated port and pass to the
TCP client somehow.
* Use the created socket to create a TCP server transport.

Use-case: unit-testing embedded software that implements a BLE host. The
controller will be a Bumble controller, connected to the host via a TCP
channel.
* The host will have a TCP-client HCI transport for testing.
* The pytest setup code will allocate the TCP server and pass the port
number to the host.

Also add some unittests with python mock.
2024-03-13 19:34:05 +01:00
51 changed files with 2533 additions and 427 deletions

View File

@@ -33,7 +33,7 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install ".[build,test,development]"
python -m pip install ".[build,test,development,pandora]"
- name: Check
run: |
invoke project.pre-commit

View File

@@ -32,7 +32,7 @@ jobs:
- name: Install
run: |
python -m pip install --upgrade pip
python -m pip install .[avatar]
python -m pip install .[avatar,pandora]
- name: Rootcanal
run: nohup python -m rootcanal > rootcanal.log &
- name: Test

2
.gitignore vendored
View File

@@ -6,6 +6,8 @@ dist/
docs/mkdocs/site
test-results.xml
__pycache__
# Vim
.*.sw*
# generated by setuptools_scm
bumble/_version.py
.vscode/launch.json

View File

@@ -2184,7 +2184,7 @@ class Device(CompositeEventEmitter):
# controller.
await self.send_command(
HCI_LE_Remove_Advertising_Set_Command(
advertising_handle=advertising_data
advertising_handle=advertising_handle
),
check_result=False,
)

View File

@@ -90,6 +90,22 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Utils
# -----------------------------------------------------------------------------
def show_services(services: Iterable[ServiceProxy]) -> None:
for service in services:
print(color(str(service), 'cyan'))
for characteristic in service.characteristics:
print(color(' ' + str(characteristic), 'magenta'))
for descriptor in characteristic.descriptors:
print(color(' ' + str(descriptor), 'green'))
# -----------------------------------------------------------------------------
# Proxies
# -----------------------------------------------------------------------------

File diff suppressed because it is too large Load Diff

View File

@@ -184,7 +184,7 @@ class Host(AbortableEventEmitter):
self.long_term_key_provider = None
self.link_key_provider = None
self.pairing_io_capability_provider = None # Classic only
self.snooper = None
self.snooper: Optional[Snooper] = None
# Connect to the source and sink if specified
if controller_source:
@@ -530,7 +530,9 @@ class Host(AbortableEventEmitter):
# Check the return parameters if required
if check_result:
if isinstance(response.return_parameters, int):
if isinstance(response, hci.HCI_Command_Status_Event):
status = response.status
elif isinstance(response.return_parameters, int):
status = response.return_parameters
elif isinstance(response.return_parameters, bytes):
# return parameters first field is a one byte status code

View File

@@ -19,8 +19,8 @@
import struct
from typing import Optional, Tuple
from ..gatt_client import ProfileServiceProxy
from ..gatt import (
from bumble.gatt_client import ServiceProxy, ProfileServiceProxy, CharacteristicProxy
from bumble.gatt import (
GATT_DEVICE_INFORMATION_SERVICE,
GATT_FIRMWARE_REVISION_STRING_CHARACTERISTIC,
GATT_HARDWARE_REVISION_STRING_CHARACTERISTIC,
@@ -104,7 +104,16 @@ class DeviceInformationService(TemplateService):
class DeviceInformationServiceProxy(ProfileServiceProxy):
SERVICE_CLASS = DeviceInformationService
def __init__(self, service_proxy):
manufacturer_name: Optional[UTF8CharacteristicAdapter]
model_number: Optional[UTF8CharacteristicAdapter]
serial_number: Optional[UTF8CharacteristicAdapter]
hardware_revision: Optional[UTF8CharacteristicAdapter]
firmware_revision: Optional[UTF8CharacteristicAdapter]
software_revision: Optional[UTF8CharacteristicAdapter]
system_id: Optional[DelegatedCharacteristicAdapter]
ieee_regulatory_certification_data_list: Optional[CharacteristicProxy]
def __init__(self, service_proxy: ServiceProxy):
self.service_proxy = service_proxy
for field, uuid in (

View File

@@ -19,6 +19,7 @@ from __future__ import annotations
import logging
import asyncio
import collections
import dataclasses
import enum
from typing import Callable, Dict, List, Optional, Tuple, Union, TYPE_CHECKING
@@ -54,6 +55,7 @@ logger = logging.getLogger(__name__)
# fmt: off
RFCOMM_PSM = 0x0003
DEFAULT_RX_QUEUE_SIZE = 32
class FrameType(enum.IntEnum):
SABM = 0x2F # Control field [1,1,1,1,_,1,0,0] LSB-first
@@ -445,7 +447,8 @@ class DLC(EventEmitter):
RESET = 0x05
connection_result: Optional[asyncio.Future]
sink: Optional[Callable[[bytes], None]]
_sink: Optional[Callable[[bytes], None]]
_enqueued_rx_packets: collections.deque[bytes]
def __init__(
self,
@@ -466,10 +469,12 @@ class DLC(EventEmitter):
self.state = DLC.State.INIT
self.role = multiplexer.role
self.c_r = 1 if self.role == Multiplexer.Role.INITIATOR else 0
self.sink = None
self.connection_result = None
self.drained = asyncio.Event()
self.drained.set()
# Queued packets when sink is not set.
self._enqueued_rx_packets = collections.deque(maxlen=DEFAULT_RX_QUEUE_SIZE)
self._sink = None
# Compute the MTU
max_overhead = 4 + 1 # header with 2-byte length + fcs
@@ -477,6 +482,19 @@ class DLC(EventEmitter):
max_frame_size, self.multiplexer.l2cap_channel.peer_mtu - max_overhead
)
@property
def sink(self) -> Optional[Callable[[bytes], None]]:
return self._sink
@sink.setter
def sink(self, sink: Optional[Callable[[bytes], None]]) -> None:
self._sink = sink
# Dump queued packets to sink
if sink:
for packet in self._enqueued_rx_packets:
sink(packet) # pylint: disable=not-callable
self._enqueued_rx_packets.clear()
def change_state(self, new_state: State) -> None:
logger.debug(f'{self} state change -> {color(new_state.name, "magenta")}')
self.state = new_state
@@ -549,8 +567,15 @@ class DLC(EventEmitter):
f'rx_credits={self.rx_credits}: {data.hex()}'
)
if data:
if self.sink:
self.sink(data) # pylint: disable=not-callable
if self._sink:
self._sink(data) # pylint: disable=not-callable
else:
self._enqueued_rx_packets.append(data)
if (
self._enqueued_rx_packets.maxlen
and len(self._enqueued_rx_packets) >= self._enqueued_rx_packets.maxlen
):
logger.warning(f'DLC [{self.dlci}] received packet queue is full')
# Update the credits
if self.rx_credits > 0:

View File

@@ -23,11 +23,24 @@ import time
import usb.core
import usb.util
from typing import Optional
from usb.core import Device as UsbDevice
from usb.core import USBError
from usb.util import CTRL_TYPE_CLASS, CTRL_RECIPIENT_OTHER
from usb.legacy import REQ_SET_FEATURE, REQ_CLEAR_FEATURE, CLASS_HUB
from .common import Transport, ParserSource
from .. import hci
from ..colors import color
# -----------------------------------------------------------------------------
# Constant
# -----------------------------------------------------------------------------
USB_PORT_FEATURE_POWER = 8
POWER_CYCLE_DELAY = 1
RESET_DELAY = 3
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
@@ -214,6 +227,10 @@ async def open_pyusb_transport(spec: str) -> Transport:
usb_find = libusb_package.find
# Find the device according to the spec moniker
power_cycle = False
if spec.startswith('!'):
power_cycle = True
spec = spec[1:]
if ':' in spec:
vendor_id, product_id = spec.split(':')
device = usb_find(idVendor=int(vendor_id, 16), idProduct=int(product_id, 16))
@@ -245,6 +262,14 @@ async def open_pyusb_transport(spec: str) -> Transport:
raise ValueError('device not found')
logger.debug(f'USB Device: {device}')
# Power Cycle the device
if power_cycle:
try:
device = await _power_cycle(device) # type: ignore
except Exception as e:
logging.debug(e)
logging.info(f"Unable to power cycle {hex(device.idVendor)} {hex(device.idProduct)}") # type: ignore
# Collect the metadata
device_metadata = {'vendor_id': device.idVendor, 'product_id': device.idProduct}
@@ -308,3 +333,73 @@ async def open_pyusb_transport(spec: str) -> Transport:
packet_sink.start()
return UsbTransport(device, packet_source, packet_sink)
async def _power_cycle(device: UsbDevice) -> UsbDevice:
"""
For devices connected to compatible USB hubs: Performs a power cycle on a given USB device.
This involves temporarily disabling its port on the hub and then re-enabling it.
"""
device_path = f'{device.bus}-{".".join(map(str, device.port_numbers))}' # type: ignore
hub = _find_hub_by_device_path(device_path)
if hub:
try:
device_port = device.port_numbers[-1] # type: ignore
_set_port_status(hub, device_port, False)
await asyncio.sleep(POWER_CYCLE_DELAY)
_set_port_status(hub, device_port, True)
await asyncio.sleep(RESET_DELAY)
# Device needs to be find again otherwise it will appear as disconnected
return usb.core.find(idVendor=device.idVendor, idProduct=device.idProduct) # type: ignore
except USBError as e:
logger.error(f"Adjustment needed: Please revise the udev rule for device {hex(device.idVendor)}:{hex(device.idProduct)} for proper recognition.") # type: ignore
logger.error(e)
return device
def _set_port_status(device: UsbDevice, port: int, on: bool):
"""Sets the power status of a specific port on a USB hub."""
device.ctrl_transfer(
bmRequestType=CTRL_TYPE_CLASS | CTRL_RECIPIENT_OTHER,
bRequest=REQ_SET_FEATURE if on else REQ_CLEAR_FEATURE,
wIndex=port,
wValue=USB_PORT_FEATURE_POWER,
)
def _find_device_by_path(sys_path: str) -> Optional[UsbDevice]:
"""Finds a USB device based on its system path."""
bus_num, *port_parts = sys_path.split('-')
ports = [int(port) for port in port_parts[0].split('.')]
devices = usb.core.find(find_all=True, bus=int(bus_num))
if devices:
for device in devices:
if device.bus == int(bus_num) and list(device.port_numbers) == ports: # type: ignore
return device
return None
def _find_hub_by_device_path(sys_path: str) -> Optional[UsbDevice]:
"""Finds the USB hub associated with a specific device path."""
hub_sys_path = sys_path.rsplit('.', 1)[0]
hub_device = _find_device_by_path(hub_sys_path)
if hub_device is None:
return None
else:
return hub_device if _is_hub(hub_device) else None
def _is_hub(device: UsbDevice) -> bool:
"""Checks if a USB device is a hub"""
if device.bDeviceClass == CLASS_HUB: # type: ignore
return True
for config in device:
for interface in config:
if interface.bInterfaceClass == CLASS_HUB: # type: ignore
return True
return False

View File

@@ -18,6 +18,7 @@
from __future__ import annotations
import asyncio
import logging
import socket
from .common import Transport, StreamPacketSource
@@ -28,6 +29,13 @@ logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# A pass-through function to ease mock testing.
async def _create_server(*args, **kw_args):
await asyncio.get_running_loop().create_server(*args, **kw_args)
async def open_tcp_server_transport(spec: str) -> Transport:
'''
Open a TCP server transport.
@@ -38,7 +46,22 @@ async def open_tcp_server_transport(spec: str) -> Transport:
Example: _:9001
'''
local_host, local_port = spec.split(':')
return await _open_tcp_server_transport_impl(
host=local_host if local_host != '_' else None, port=int(local_port)
)
async def open_tcp_server_transport_with_socket(sock: socket.socket) -> Transport:
'''
Open a TCP server transport with an existing socket.
One reason to use this variant is to let python pick an unused port.
'''
return await _open_tcp_server_transport_impl(sock=sock)
async def _open_tcp_server_transport_impl(**kwargs) -> Transport:
class TcpServerTransport(Transport):
async def close(self):
await super().close()
@@ -77,13 +100,10 @@ async def open_tcp_server_transport(spec: str) -> Transport:
else:
logger.debug('no client, dropping packet')
local_host, local_port = spec.split(':')
packet_source = StreamPacketSource()
packet_sink = TcpServerPacketSink()
await asyncio.get_running_loop().create_server(
lambda: TcpServerProtocol(packet_source, packet_sink),
host=local_host if local_host != '_' else None,
port=int(local_port),
await _create_server(
lambda: TcpServerProtocol(packet_source, packet_sink), **kwargs
)
return TcpServerTransport(packet_source, packet_sink)

View File

@@ -61,7 +61,7 @@ async def func4(x, y):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
print("MAIN: start, loop=", asyncio.get_running_loop())
print("MAIN: invoke func1")
func1(1, 2)

View File

@@ -21,23 +21,29 @@ import os
import logging
from bumble.colors import color
from bumble.device import Device
from bumble.hci import Address
from bumble.transport import open_transport
from bumble.profiles.battery_service import BatteryServiceProxy
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) != 3:
print('Usage: battery_client.py <transport-spec> <bluetooth-address>')
print('example: battery_client.py usb:0 E1:CA:72:48:C4:E8')
return
print('<<< connecting to HCI...')
async with await open_transport(sys.argv[1]) as (hci_source, hci_sink):
async with await open_transport(sys.argv[1]) as hci_transport:
print('<<< connected')
# Create and start a device
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
device = Device.with_hci(
'Bumble',
Address('F0:F1:F2:F3:F4:F5'),
hci_transport.source,
hci_transport.sink,
)
await device.power_on()
# Connect to the peer

View File

@@ -29,14 +29,16 @@ from bumble.profiles.battery_service import BatteryService
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) != 3:
print('Usage: python battery_server.py <device-config> <transport-spec>')
print('example: python battery_server.py device1.json usb:0')
return
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
# Add a Battery Service to the GATT sever
battery_service = BatteryService(lambda _: random.randint(0, 100))

View File

@@ -21,12 +21,13 @@ import os
import logging
from bumble.colors import color
from bumble.device import Device, Peer
from bumble.hci import Address
from bumble.profiles.device_information_service import DeviceInformationServiceProxy
from bumble.transport import open_transport
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) != 3:
print(
'Usage: device_information_client.py <transport-spec> <bluetooth-address>'
@@ -35,11 +36,16 @@ async def main():
return
print('<<< connecting to HCI...')
async with await open_transport(sys.argv[1]) as (hci_source, hci_sink):
async with await open_transport(sys.argv[1]) as hci_transport:
print('<<< connected')
# Create and start a device
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
device = Device.with_hci(
'Bumble',
Address('F0:F1:F2:F3:F4:F5'),
hci_transport.source,
hci_transport.sink,
)
await device.power_on()
# Connect to the peer

View File

@@ -28,14 +28,16 @@ from bumble.profiles.device_information_service import DeviceInformationService
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) != 3:
print('Usage: python device_info_server.py <device-config> <transport-spec>')
print('example: python device_info_server.py device1.json usb:0')
return
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
# Add a Device Information Service to the GATT sever
device_information_service = DeviceInformationService(
@@ -64,7 +66,7 @@ async def main():
# Go!
await device.power_on()
await device.start_advertising(auto_restart=True)
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -21,23 +21,29 @@ import os
import logging
from bumble.colors import color
from bumble.device import Device
from bumble.hci import Address
from bumble.transport import open_transport
from bumble.profiles.heart_rate_service import HeartRateServiceProxy
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) != 3:
print('Usage: heart_rate_client.py <transport-spec> <bluetooth-address>')
print('example: heart_rate_client.py usb:0 E1:CA:72:48:C4:E8')
return
print('<<< connecting to HCI...')
async with await open_transport(sys.argv[1]) as (hci_source, hci_sink):
async with await open_transport(sys.argv[1]) as hci_transport:
print('<<< connected')
# Create and start a device
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
device = Device.with_hci(
'Bumble',
Address('F0:F1:F2:F3:F4:F5'),
hci_transport.source,
hci_transport.sink,
)
await device.power_on()
# Connect to the peer

View File

@@ -33,14 +33,16 @@ from bumble.utils import AsyncRunner
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) != 3:
print('Usage: python heart_rate_server.py <device-config> <transport-spec>')
print('example: python heart_rate_server.py device1.json usb:0')
return
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
# Keep track of accumulated expended energy
energy_start_time = time.time()

350
examples/hfp_gateway.html Normal file
View File

@@ -0,0 +1,350 @@
<html data-bs-theme="dark">
<head>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.2/dist/css/bootstrap.min.css" rel="stylesheet"
integrity="sha384-T3c6CoIi6uLrA9TneNEoa7RxnatzjcDSCmG1MXxSR1GAsXEV/Dwwykc2MPK8M2HN" crossorigin="anonymous">
<script src="https://unpkg.com/pcm-player"></script>
</head>
<body>
<nav class="navbar navbar-dark bg-primary">
<div class="container">
<span class="navbar-brand mb-0 h1">Bumble HFP Audio Gateway</span>
</div>
</nav>
<br>
<div class="container">
<label class="form-label">Send AT Response</label>
<div class="input-group mb-3">
<input type="text" class="form-control" placeholder="AT Response" aria-label="AT response" id="at_response">
<button class="btn btn-primary" type="button"
onclick="send_at_response(document.getElementById('at_response').value)">Send</button>
</div>
<div class="row">
<div class="col-3">
<label class="form-label">Speaker Volume</label>
<div class="input-group mb-3 col-auto">
<input type="text" class="form-control" placeholder="0 - 15" aria-label="Speaker Volume"
id="speaker_volume">
<button class="btn btn-primary" type="button"
onclick="send_at_response(`+VGS: ${document.getElementById('speaker_volume').value}`)">Set</button>
</div>
</div>
<div class="col-3">
<label class="form-label">Mic Volume</label>
<div class="input-group mb-3 col-auto">
<input type="text" class="form-control" placeholder="0 - 15" aria-label="Mic Volume"
id="mic_volume">
<button class="btn btn-primary" type="button"
onclick="send_at_response(`+VGM: ${document.getElementById('mic_volume').value}`)">Set</button>
</div>
</div>
<div class="col-3">
<label class="form-label">Browser Gain</label>
<input type="range" class="form-range" id="browser-gain" min="0" max="2" value="1" step="0.1" onchange="setGain()">
</div>
</div>
<div class="row">
<div class="col-auto">
<div class="input-group mb-3">
<span class="input-group-text">Codec</span>
<select class="form-select" id="codec">
<option selected value="1">CVSD</option>
<option value="2">MSBC</option>
</select>
</div>
</div>
<div class="col-auto">
<button class="btn btn-primary" onclick="negotiate_codec()">Negotiate Codec</button>
</div>
<div class="col-auto">
<button class="btn btn-primary" onclick="connect_sco()">Connect SCO</button>
</div>
<div class="col-auto">
<button class="btn btn-primary" onclick="disconnect_sco()">Disconnect SCO</button>
</div>
<div class="col-auto">
<button class="btn btn-danger" onclick="connectAudio()">Connect Audio</button>
</div>
</div>
<hr>
<div class="row">
<h4>AG Indicators</h2>
<div class="col-3">
<label class="form-label">call</label>
<div class="input-group mb-3 col-auto">
<select class="form-select" id="call">
<option selected value="0">Inactive</option>
<option value="1">Active</option>
</select>
<button class="btn btn-primary" type="button" onclick="update_ag_indicator('call')">Set</button>
</div>
</div>
<div class="col-3">
<label class="form-label">callsetup</label>
<div class="input-group mb-3 col-auto">
<select class="form-select" id="callsetup">
<option selected value="0">Idle</option>
<option value="1">Incoming</option>
<option value="2">Outgoing</option>
<option value="3">Remote Alerted</option>
</select>
<button class="btn btn-primary" type="button"
onclick="update_ag_indicator('callsetup')">Set</button>
</div>
</div>
<div class="col-3">
<label class="form-label">callheld</label>
<div class="input-group mb-3 col-auto">
<select class="form-select" id="callsetup">
<option selected value="0">0</option>
<option value="1">1</option>
<option value="2">2</option>
</select>
<button class="btn btn-primary" type="button"
onclick="update_ag_indicator('callheld')">Set</button>
</div>
</div>
<div class="col-3">
<label class="form-label">signal</label>
<div class="input-group mb-3 col-auto">
<select class="form-select" id="signal">
<option selected value="0">0</option>
<option value="1">1</option>
<option value="2">2</option>
<option value="3">3</option>
<option value="4">4</option>
<option value="5">5</option>
</select>
<button class="btn btn-primary" type="button"
onclick="update_ag_indicator('signal')">Set</button>
</div>
</div>
<div class="col-3">
<label class="form-label">roam</label>
<div class="input-group mb-3 col-auto">
<select class="form-select" id="roam">
<option selected value="0">0</option>
<option value="1">1</option>
</select>
<button class="btn btn-primary" type="button" onclick="update_ag_indicator('roam')">Set</button>
</div>
</div>
<div class="col-3">
<label class="form-label">battchg</label>
<div class="input-group mb-3 col-auto">
<select class="form-select" id="battchg">
<option selected value="0">0</option>
<option value="1">1</option>
<option value="2">2</option>
<option value="3">3</option>
<option value="4">4</option>
<option value="5">5</option>
</select>
<button class="btn btn-primary" type="button"
onclick="update_ag_indicator('battchg')">Set</button>
</div>
</div>
<div class="col-3">
<label class="form-label">service</label>
<div class="input-group mb-3 col-auto">
<select class="form-select" id="service">
<option selected value="0">0</option>
<option value="1">1</option>
</select>
<button class="btn btn-primary" type="button"
onclick="update_ag_indicator('service')">Set</button>
</div>
</div>
</div>
<hr>
<button class="btn btn-primary" onclick="send_at_response('+BVRA: 1')">Start Voice Assistant</button>
<button class="btn btn-primary" onclick="send_at_response('+BVRA: 0')">Stop Voice Assistant</button>
<hr>
<h4>Calls</h4>
<div id="call-lists">
<template id="call-template">
<div class="row call-row">
<div class="input-group mb-3">
<label class="input-group-text">Index</label>
<input class="form-control call-index" value="1">
<label class="input-group-text">Number</label>
<input class="form-control call-number">
<label class="input-group-text">Direction</label>
<select class="form-select call-direction">
<option selected value="0">Originated</option>
<option value="1">Terminated</option>
</select>
<label class="input-group-text">Status</label>
<select class="form-select call-status">
<option value="0">ACTIVE</option>
<option value="1">HELD</option>
<option value="2">DIALING</option>
<option value="3">ALERTING</option>
<option value="4">INCOMING</option>
<option value="5">WAITING</option>
</select>
<button class="btn btn-primary call-remover"></button>
</div>
</div>
</template>
</div>
<button class="btn btn-primary" onclick="add_call()"> Add Call</button>
<button class="btn btn-primary" onclick="update_calls()">🗘 Update Calls</button>
<hr>
<div id="socketStateContainer" class="bg-body-tertiary p-3 rounded-2">
<h3>Log</h3>
<code id="log" style="white-space: pre-line;"></code>
</div>
</div>
<script>
let atResponseInput = document.getElementById("at_response")
let gainInput = document.getElementById('browser-gain')
let log = document.getElementById("log")
let socket = new WebSocket('ws://localhost:8888');
let sampleRate = 0;
let player;
socket.binaryType = "arraybuffer";
socket.onopen = _ => {
log.textContent += 'SOCKET OPEN\n'
}
socket.onclose = _ => {
log.textContent += 'SOCKET CLOSED\n'
}
socket.onerror = (error) => {
log.textContent += 'SOCKET ERROR\n'
console.log(`ERROR: ${error}`)
}
socket.onmessage = function (message) {
if (typeof message.data === 'string' || message.data instanceof String) {
log.textContent += `<-- ${event.data}\n`
const jsonMessage = JSON.parse(event.data)
if (jsonMessage.type == 'speaker_volume') {
document.getElementById('speaker_volume').value = jsonMessage.level;
} else if (jsonMessage.type == 'microphone_volume') {
document.getElementById('microphone_volume').value = jsonMessage.level;
} else if (jsonMessage.type == 'sco_state_change') {
sampleRate = jsonMessage.sample_rate;
console.log(sampleRate);
if (player != null) {
player = new PCMPlayer({
inputCodec: 'Int16',
channels: 1,
sampleRate: sampleRate,
flushTime: 7.5,
});
player.volume(gainInput.value);
}
}
} else {
// BINARY audio data.
if (player == null) return;
player.feed(message.data);
}
};
function send(message) {
if (socket && socket.readyState == WebSocket.OPEN) {
let jsonMessage = JSON.stringify(message)
log.textContent += `--> ${jsonMessage}\n`
socket.send(jsonMessage)
} else {
log.textContent += 'NOT CONNECTED\n'
}
}
function send_at_response(response) {
send({ type: 'at_response', response: response })
}
function update_ag_indicator(indicator) {
const value = document.getElementById(indicator).value
send({ type: 'ag_indicator', indicator: indicator, value: value })
}
function connect_sco() {
send({ type: 'connect_sco' })
}
function negotiate_codec() {
const codec = document.getElementById('codec').value
send({ type: 'negotiate_codec', codec: codec })
}
function disconnect_sco() {
send({ type: 'disconnect_sco' })
}
function add_call() {
let callLists = document.getElementById('call-lists');
let template = document.getElementById('call-template');
let newNode = document.importNode(template.content, true);
newNode.querySelector('.call-remover').onclick = function (event) {
event.target.closest('.call-row').remove();
}
callLists.appendChild(newNode);
}
function update_calls() {
let callLists = document.getElementById('call-lists');
send({
type: 'update_calls',
calls: Array.from(
callLists.querySelectorAll('.call-row')).map(
function (element) {
return {
index: element.querySelector('.call-index').value,
number: element.querySelector('.call-number').value,
direction: element.querySelector('.call-direction').value,
status: element.querySelector('.call-status').value,
}
}
),
}
)
}
function connectAudio() {
player = new PCMPlayer({
inputCodec: 'Int16',
channels: 1,
sampleRate: sampleRate,
flushTime: 7.5,
});
player.volume(gainInput.value);
}
function setGain() {
if (player != null) {
player.volume(gainInput.value);
}
}
</script>
</div>
</body>
</html>

View File

@@ -1,4 +1,5 @@
{
"name": "Bumble Phone",
"class_of_device": 6291980
"class_of_device": 6291980,
"keystore": "JsonKeyStore"
}

View File

@@ -416,7 +416,7 @@ async def keyboard_device(device, command):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 4:
print(
'Usage: python keyboard.py <device-config> <transport-spec> <command>'
@@ -434,9 +434,11 @@ async def main():
)
return
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
# Create a device to manage the host
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
command = sys.argv[3]
if command == 'connect':

View File

@@ -139,18 +139,20 @@ async def find_a2dp_service(connection):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 4:
print('Usage: run_a2dp_info.py <device-config> <transport-spec> <bt-addr>')
print('example: run_a2dp_info.py classic1.json usb:0 14:7D:DA:4E:53:A8')
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True
# Start the controller
@@ -187,7 +189,7 @@ async def main():
client = await AVDTP_Protocol.connect(connection, avdtp_version)
# Discover all endpoints on the remote device
endpoints = await client.discover_remote_endpoints()
endpoints = list(await client.discover_remote_endpoints())
print(f'@@@ Found {len(endpoints)} endpoints')
for endpoint in endpoints:
print('@@@', endpoint)

View File

@@ -19,6 +19,7 @@ import asyncio
import sys
import os
import logging
from typing import Any, Dict
from bumble.device import Device
from bumble.transport import open_transport_or_link
@@ -41,7 +42,7 @@ from bumble.a2dp import (
SbcMediaCodecInformation,
)
Context = {'output': None}
Context: Dict[Any, Any] = {'output': None}
# -----------------------------------------------------------------------------
@@ -104,7 +105,7 @@ def on_rtp_packet(packet):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 4:
print(
'Usage: run_a2dp_sink.py <device-config> <transport-spec> <sbc-file> '
@@ -114,14 +115,16 @@ async def main():
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
with open(sys.argv[3], 'wb') as sbc_file:
Context['output'] = sbc_file
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True
# Setup the SDP to expose the sink service
@@ -162,7 +165,7 @@ async def main():
await device.set_discoverable(True)
await device.set_connectable(True)
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -114,7 +114,7 @@ async def stream_packets(read_function, protocol):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 4:
print(
'Usage: run_a2dp_source.py <device-config> <transport-spec> <sbc-file> '
@@ -126,11 +126,13 @@ async def main():
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True
# Setup the SDP to expose the SRC service
@@ -186,7 +188,7 @@ async def main():
await device.set_discoverable(True)
await device.set_connectable(True)
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -28,7 +28,7 @@ from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_advertiser.py <config-file> <transport-spec> [type] [address]'
@@ -50,10 +50,12 @@ async def main():
target = None
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
if advertising_type.is_scannable:
device.scan_response_data = bytes(
@@ -66,7 +68,7 @@ async def main():
await device.power_on()
await device.start_advertising(advertising_type=advertising_type, target=target)
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -49,7 +49,7 @@ ASHA_LE_PSM_OUT_CHARACTERISTIC = UUID(
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) != 4:
print(
'Usage: python run_asha_sink.py <device-config> <transport-spec> '
@@ -60,8 +60,10 @@ async def main():
audio_out = open(sys.argv[3], 'wb')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
# Handler for audio control commands
def on_audio_control_point_write(_connection, value):
@@ -197,7 +199,7 @@ async def main():
await device.power_on()
await device.start_advertising(auto_restart=True)
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -331,7 +331,7 @@ class Delegate(avrcp.Delegate):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_avrcp_controller.py <device-config> <transport-spec> '
@@ -341,11 +341,13 @@ async def main():
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True
# Setup the SDP to expose the sink service

View File

@@ -32,7 +32,7 @@ from bumble.sdp import (
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_classic_connect.py <device-config> <transport-spec> '
@@ -42,11 +42,13 @@ async def main():
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True
device.le_enabled = False
await device.power_on()

View File

@@ -91,18 +91,20 @@ SDP_SERVICE_RECORDS = {
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 3:
print('Usage: run_classic_discoverable.py <device-config> <transport-spec>')
print('example: run_classic_discoverable.py classic1.json usb:04b4:f901')
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True
device.sdp_service_records = SDP_SERVICE_RECORDS
await device.power_on()
@@ -111,7 +113,7 @@ async def main():
await device.set_discoverable(True)
await device.set_connectable(True)
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -20,8 +20,8 @@ import sys
import os
import logging
from bumble.colors import color
from bumble.device import Device
from bumble.hci import Address
from bumble.transport import open_transport_or_link
from bumble.core import DeviceClass
@@ -53,22 +53,27 @@ class DiscoveryListener(Device.Listener):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) != 2:
print('Usage: run_classic_discovery.py <transport-spec>')
print('example: run_classic_discovery.py usb:04b4:f901')
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[1]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[1]) as hci_transport:
print('<<< connected')
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
device = Device.with_hci(
'Bumble',
Address('F0:F1:F2:F3:F4:F5'),
hci_transport.source,
hci_transport.sink,
)
device.listener = DiscoveryListener()
await device.power_on()
await device.start_discovery()
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -25,7 +25,7 @@ from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_connect_and_encrypt.py <device-config> <transport-spec> '
@@ -37,11 +37,13 @@ async def main():
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
await device.power_on()
# Connect to the peer
@@ -56,7 +58,7 @@ async def main():
print(f'!!! Encryption failed: {error}')
return
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -36,7 +36,7 @@ from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) != 4:
print(
'Usage: run_controller.py <controller-address> <device-config> '
@@ -49,7 +49,7 @@ async def main():
return
print('>>> connecting to HCI...')
async with await open_transport_or_link(sys.argv[3]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[3]) as hci_transport:
print('>>> connected')
# Create a local link
@@ -57,7 +57,10 @@ async def main():
# Create a first controller using the packet source/sink as its host interface
controller1 = Controller(
'C1', host_source=hci_source, host_sink=hci_sink, link=link
'C1',
host_source=hci_transport.source,
host_sink=hci_transport.sink,
link=link,
)
controller1.random_address = sys.argv[1]
@@ -98,7 +101,7 @@ async def main():
await device.start_advertising()
await device.start_scanning()
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -20,9 +20,9 @@ import asyncio
import sys
import os
from bumble.colors import color
from bumble.device import Device
from bumble.controller import Controller
from bumble.hci import Address
from bumble.link import LocalLink
from bumble.transport import open_transport_or_link
@@ -45,14 +45,14 @@ class ScannerListener(Device.Listener):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) != 2:
print('Usage: run_controller.py <transport-spec>')
print('example: run_controller_with_scanner.py serial:/dev/pts/14,1000000')
return
print('>>> connecting to HCI...')
async with await open_transport_or_link(sys.argv[1]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[1]) as hci_transport:
print('>>> connected')
# Create a local link
@@ -60,22 +60,25 @@ async def main():
# Create a first controller using the packet source/sink as its host interface
controller1 = Controller(
'C1', host_source=hci_source, host_sink=hci_sink, link=link
'C1',
host_source=hci_transport.source,
host_sink=hci_transport.sink,
link=link,
public_address='E0:E1:E2:E3:E4:E5',
)
controller1.address = 'E0:E1:E2:E3:E4:E5'
# Create a second controller using the same link
controller2 = Controller('C2', link=link)
# Create a device with a scanner listener
device = Device.with_hci(
'Bumble', 'F0:F1:F2:F3:F4:F5', controller2, controller2
'Bumble', Address('F0:F1:F2:F3:F4:F5'), controller2, controller2
)
device.listener = ScannerListener()
await device.power_on()
await device.start_scanning()
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -20,31 +20,36 @@ import sys
import os
import logging
from bumble.colors import color
from bumble.hci import Address
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.snoop import BtSnooper
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) != 3:
print('Usage: run_device_with_snooper.py <transport-spec> <snoop-file>')
print('example: run_device_with_snooper.py usb:0 btsnoop.log')
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[1]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[1]) as hci_transport:
print('<<< connected')
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
device = Device.with_hci(
'Bumble',
Address('F0:F1:F2:F3:F4:F5'),
hci_transport.source,
hci_transport.sink,
)
with open(sys.argv[2], "wb") as snoop_file:
device.host.snooper = BtSnooper(snoop_file)
await device.power_on()
await device.start_scanning()
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -69,7 +69,7 @@ class Listener(Device.Listener):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_gatt_client.py <device-config> <transport-spec> '
@@ -79,11 +79,13 @@ async def main():
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device to manage the host, with a custom listener
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.listener = Listener(device)
await device.power_on()

View File

@@ -19,21 +19,21 @@ import asyncio
import os
import logging
from bumble.colors import color
from bumble.core import ProtocolError
from bumble.controller import Controller
from bumble.device import Device, Peer
from bumble.hci import Address
from bumble.host import Host
from bumble.link import LocalLink
from bumble.gatt import (
Service,
Characteristic,
Descriptor,
show_services,
GATT_CHARACTERISTIC_USER_DESCRIPTION_DESCRIPTOR,
GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
GATT_DEVICE_INFORMATION_SERVICE,
)
from bumble.gatt_client import show_services
# -----------------------------------------------------------------------------
@@ -43,7 +43,7 @@ class ServerListener(Device.Listener):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
# Create a local link
link = LocalLink()
@@ -51,14 +51,18 @@ async def main():
client_controller = Controller("client controller", link=link)
client_host = Host()
client_host.controller = client_controller
client_device = Device("client", address='F0:F1:F2:F3:F4:F5', host=client_host)
client_device = Device(
"client", address=Address('F0:F1:F2:F3:F4:F5'), host=client_host
)
await client_device.power_on()
# Setup a stack for the server
server_controller = Controller("server controller", link=link)
server_host = Host()
server_host.controller = server_controller
server_device = Device("server", address='F6:F7:F8:F9:FA:FB', host=server_host)
server_device = Device(
"server", address=Address('F6:F7:F8:F9:FA:FB'), host=server_host
)
server_device.listener = ServerListener()
await server_device.power_on()

View File

@@ -71,7 +71,7 @@ def my_custom_write_with_error(connection, value):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_gatt_server.py <device-config> <transport-spec> '
@@ -81,11 +81,13 @@ async def main():
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device to manage the host
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.listener = Listener(device)
# Add a few entries to the device's GATT server
@@ -146,7 +148,7 @@ async def main():
else:
await device.start_advertising(auto_restart=True)
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -16,240 +16,270 @@
# Imports
# -----------------------------------------------------------------------------
import asyncio
import json
import sys
import os
import io
import logging
import websockets
from bumble.colors import color
from typing import Optional
import bumble.core
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.core import (
BT_HANDSFREE_SERVICE,
BT_RFCOMM_PROTOCOL_ID,
BT_BR_EDR_TRANSPORT,
)
from bumble import rfcomm, hfp
from bumble.hci import HCI_SynchronousDataPacket
from bumble.sdp import (
Client as SDP_Client,
DataElement,
ServiceAttribute,
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
)
from bumble import hci, rfcomm, hfp
logger = logging.getLogger(__name__)
ws: Optional[websockets.WebSocketServerProtocol] = None
ag_protocol: Optional[hfp.AgProtocol] = None
source_file: Optional[io.BufferedReader] = None
# -----------------------------------------------------------------------------
# pylint: disable-next=too-many-nested-blocks
async def list_rfcomm_channels(device, connection):
# Connect to the SDP Server
sdp_client = SDP_Client(connection)
await sdp_client.connect()
# Search for services that support the Handsfree Profile
search_result = await sdp_client.search_attributes(
[BT_HANDSFREE_SERVICE],
[
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
def _default_configuration() -> hfp.AgConfiguration:
return hfp.AgConfiguration(
supported_ag_features=[
hfp.AgFeature.HF_INDICATORS,
hfp.AgFeature.IN_BAND_RING_TONE_CAPABILITY,
hfp.AgFeature.REJECT_CALL,
hfp.AgFeature.CODEC_NEGOTIATION,
hfp.AgFeature.ESCO_S4_SETTINGS_SUPPORTED,
hfp.AgFeature.ENHANCED_CALL_STATUS,
],
supported_ag_indicators=[
hfp.AgIndicatorState.call(),
hfp.AgIndicatorState.callsetup(),
hfp.AgIndicatorState.callheld(),
hfp.AgIndicatorState.service(),
hfp.AgIndicatorState.signal(),
hfp.AgIndicatorState.roam(),
hfp.AgIndicatorState.battchg(),
],
supported_hf_indicators=[
hfp.HfIndicator.ENHANCED_SAFETY,
hfp.HfIndicator.BATTERY_LEVEL,
],
supported_ag_call_hold_operations=[],
supported_audio_codecs=[hfp.AudioCodec.CVSD, hfp.AudioCodec.MSBC],
)
print(color('==================================', 'blue'))
print(color('Handsfree Services:', 'yellow'))
rfcomm_channels = []
# pylint: disable-next=too-many-nested-blocks
for attribute_list in search_result:
# Look for the RFCOMM Channel number
protocol_descriptor_list = ServiceAttribute.find_attribute_in_list(
attribute_list, SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID
def send_message(type: str, **kwargs) -> None:
if ws:
asyncio.create_task(ws.send(json.dumps({'type': type, **kwargs})))
def on_speaker_volume(level: int):
send_message(type='speaker_volume', level=level)
def on_microphone_volume(level: int):
send_message(type='microphone_volume', level=level)
def on_sco_state_change(codec: int):
if codec == hfp.AudioCodec.CVSD:
sample_rate = 8000
elif codec == hfp.AudioCodec.MSBC:
sample_rate = 16000
else:
sample_rate = 0
send_message(type='sco_state_change', sample_rate=sample_rate)
def on_sco_packet(packet: hci.HCI_SynchronousDataPacket):
if ws:
asyncio.create_task(ws.send(packet.data))
if source_file and (pcm_data := source_file.read(packet.data_total_length)):
assert ag_protocol
host = ag_protocol.dlc.multiplexer.l2cap_channel.connection.device.host
host.send_hci_packet(
hci.HCI_SynchronousDataPacket(
connection_handle=packet.connection_handle,
packet_status=0,
data_total_length=len(pcm_data),
data=pcm_data,
)
)
if protocol_descriptor_list:
for protocol_descriptor in protocol_descriptor_list.value:
if len(protocol_descriptor.value) >= 2:
if protocol_descriptor.value[0].value == BT_RFCOMM_PROTOCOL_ID:
print(color('SERVICE:', 'green'))
print(
color(' RFCOMM Channel:', 'cyan'),
protocol_descriptor.value[1].value,
)
rfcomm_channels.append(protocol_descriptor.value[1].value)
# List profiles
bluetooth_profile_descriptor_list = (
ServiceAttribute.find_attribute_in_list(
attribute_list,
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
)
)
if bluetooth_profile_descriptor_list:
if bluetooth_profile_descriptor_list.value:
if (
bluetooth_profile_descriptor_list.value[0].type
== DataElement.SEQUENCE
):
bluetooth_profile_descriptors = (
bluetooth_profile_descriptor_list.value
)
else:
# Sometimes, instead of a list of lists, we just
# find a list. Fix that
bluetooth_profile_descriptors = [
bluetooth_profile_descriptor_list
]
print(color(' Profiles:', 'green'))
for (
bluetooth_profile_descriptor
) in bluetooth_profile_descriptors:
version_major = (
bluetooth_profile_descriptor.value[1].value >> 8
)
version_minor = (
bluetooth_profile_descriptor.value[1].value
& 0xFF
)
print(
' '
f'{bluetooth_profile_descriptor.value[0].value}'
f' - version {version_major}.{version_minor}'
)
def on_hfp_state_change(connected: bool):
send_message(type='hfp_state_change', connected=connected)
# List service classes
service_class_id_list = ServiceAttribute.find_attribute_in_list(
attribute_list, SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID
)
if service_class_id_list:
if service_class_id_list.value:
print(color(' Service Classes:', 'green'))
for service_class_id in service_class_id_list.value:
print(' ', service_class_id.value)
await sdp_client.disconnect()
return rfcomm_channels
async def ws_server(ws_client: websockets.WebSocketServerProtocol, path: str):
del path
global ws
ws = ws_client
async for message in ws_client:
if not ag_protocol:
continue
json_message = json.loads(message)
message_type = json_message['type']
connection = ag_protocol.dlc.multiplexer.l2cap_channel.connection
device = connection.device
try:
if message_type == 'at_response':
ag_protocol.send_response(json_message['response'])
elif message_type == 'ag_indicator':
ag_protocol.update_ag_indicator(
hfp.AgIndicator(json_message['indicator']),
int(json_message['value']),
)
elif message_type == 'negotiate_codec':
codec = hfp.AudioCodec(int(json_message['codec']))
await ag_protocol.negotiate_codec(codec)
elif message_type == 'connect_sco':
if ag_protocol.active_codec == hfp.AudioCodec.CVSD:
esco_param = hfp.ESCO_PARAMETERS[
hfp.DefaultCodecParameters.ESCO_CVSD_S4
]
elif ag_protocol.active_codec == hfp.AudioCodec.MSBC:
esco_param = hfp.ESCO_PARAMETERS[
hfp.DefaultCodecParameters.ESCO_MSBC_T2
]
else:
raise ValueError(f'Unsupported codec {codec}')
await device.send_command(
hci.HCI_Enhanced_Setup_Synchronous_Connection_Command(
connection_handle=connection.handle, **esco_param.asdict()
)
)
elif message_type == 'disconnect_sco':
# Copy the values to avoid iteration error.
for sco_link in list(device.sco_links.values()):
await sco_link.disconnect()
elif message_type == 'update_calls':
ag_protocol.calls = [
hfp.CallInfo(
index=int(call['index']),
direction=hfp.CallInfoDirection(int(call['direction'])),
status=hfp.CallInfoStatus(int(call['status'])),
number=call['number'],
multi_party=hfp.CallInfoMultiParty.NOT_IN_CONFERENCE,
mode=hfp.CallInfoMode.VOICE,
)
for call in json_message['calls']
]
except Exception as e:
send_message(type='error', message=e)
# -----------------------------------------------------------------------------
async def main():
if len(sys.argv) < 4:
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_hfp_gateway.py <device-config> <transport-spec> '
'<bluetooth-address>'
'[bluetooth-address] [wav-file-for-source]'
)
print(
' specifying a channel number, or "discover" to list all RFCOMM channels'
'example: run_hfp_gateway.py hfp_gateway.json usb:0 E1:CA:72:48:C4:E8 sample.wav'
)
print('example: run_hfp_gateway.py hfp_gateway.json usb:0 E1:CA:72:48:C4:E8')
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True
await device.power_on()
# Connect to a peer
target_address = sys.argv[3]
print(f'=== Connecting to {target_address}...')
connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT)
print(f'=== Connected to {connection.peer_address}!')
rfcomm_server = rfcomm.Server(device)
configuration = _default_configuration()
# Get a list of all the Handsfree services (should only be 1)
channels = await list_rfcomm_channels(device, connection)
if len(channels) == 0:
print('!!! no service found')
return
def on_dlc(dlc: rfcomm.DLC):
global ag_protocol
ag_protocol = hfp.AgProtocol(dlc, configuration)
ag_protocol.on('speaker_volume', on_speaker_volume)
ag_protocol.on('microphone_volume', on_microphone_volume)
on_hfp_state_change(True)
dlc.multiplexer.l2cap_channel.on(
'close', lambda: on_hfp_state_change(False)
)
# Pick the first one
channel = channels[0]
channel = rfcomm_server.listen(on_dlc)
device.sdp_service_records = {
1: hfp.make_ag_sdp_records(1, channel, configuration)
}
# Request authentication
print('*** Authenticating...')
await connection.authenticate()
print('*** Authenticated')
def on_sco_connection(sco_link):
assert ag_protocol
on_sco_state_change(ag_protocol.active_codec)
sco_link.on('disconnection', lambda _: on_sco_state_change(0))
sco_link.on('pdu', on_sco_packet)
# Enable encryption
print('*** Enabling encryption...')
await connection.encrypt()
print('*** Encryption on')
device.on('sco_connection', on_sco_connection)
if len(sys.argv) >= 4:
# Connect to a peer
target_address = sys.argv[3]
print(f'=== Connecting to {target_address}...')
connection = await device.connect(
target_address, transport=BT_BR_EDR_TRANSPORT
)
print(f'=== Connected to {connection.peer_address}!')
# Create a client and start it
print('@@@ Starting to RFCOMM client...')
rfcomm_client = rfcomm.Client(connection)
rfcomm_mux = await rfcomm_client.start()
print('@@@ Started')
# Get a list of all the Handsfree services (should only be 1)
if not (hfp_record := await hfp.find_hf_sdp_record(connection)):
print('!!! no service found')
return
print(f'### Opening session for channel {channel}...')
try:
session = await rfcomm_mux.open_dlc(channel)
print('### Session open', session)
except bumble.core.ConnectionError as error:
print(f'### Session open failed: {error}')
await rfcomm_mux.disconnect()
print('@@@ Disconnected from RFCOMM server')
return
# Pick the first one
channel, version, hf_sdp_features = hfp_record
print(f'HF version: {version}')
print(f'HF features: {hf_sdp_features}')
def on_sco(connection_handle: int, packet: HCI_SynchronousDataPacket):
# Reset packet and loopback
packet.packet_status = 0
device.host.send_hci_packet(packet)
# Request authentication
print('*** Authenticating...')
await connection.authenticate()
print('*** Authenticated')
device.host.on('sco_packet', on_sco)
# Enable encryption
print('*** Enabling encryption...')
await connection.encrypt()
print('*** Encryption on')
# Protocol loop (just for testing at this point)
protocol = hfp.HfpProtocol(session)
while True:
line = await protocol.next_line()
# Create a client and start it
print('@@@ Starting to RFCOMM client...')
rfcomm_client = rfcomm.Client(connection)
rfcomm_mux = await rfcomm_client.start()
print('@@@ Started')
if line.startswith('AT+BRSF='):
protocol.send_response_line('+BRSF: 30')
protocol.send_response_line('OK')
elif line.startswith('AT+CIND=?'):
protocol.send_response_line(
'+CIND: ("call",(0,1)),("callsetup",(0-3)),("service",(0-1)),'
'("signal",(0-5)),("roam",(0,1)),("battchg",(0-5)),'
'("callheld",(0-2))'
)
protocol.send_response_line('OK')
elif line.startswith('AT+CIND?'):
protocol.send_response_line('+CIND: 0,0,1,4,1,5,0')
protocol.send_response_line('OK')
elif line.startswith('AT+CMER='):
protocol.send_response_line('OK')
elif line.startswith('AT+CHLD=?'):
protocol.send_response_line('+CHLD: 0')
protocol.send_response_line('OK')
elif line.startswith('AT+BTRH?'):
protocol.send_response_line('+BTRH: 0')
protocol.send_response_line('OK')
elif line.startswith('AT+CLIP='):
protocol.send_response_line('OK')
elif line.startswith('AT+VGS='):
protocol.send_response_line('OK')
elif line.startswith('AT+BIA='):
protocol.send_response_line('OK')
elif line.startswith('AT+BVRA='):
protocol.send_response_line(
'+BVRA: 1,1,12AA,1,1,"Message 1 from Janina"'
)
elif line.startswith('AT+XEVENT='):
protocol.send_response_line('OK')
elif line.startswith('AT+XAPL='):
protocol.send_response_line('OK')
else:
print(color('UNSUPPORTED AT COMMAND', 'red'))
protocol.send_response_line('ERROR')
print(f'### Opening session for channel {channel}...')
try:
session = await rfcomm_mux.open_dlc(channel)
print('### Session open', session)
except bumble.core.ConnectionError as error:
print(f'### Session open failed: {error}')
await rfcomm_mux.disconnect()
print('@@@ Disconnected from RFCOMM server')
return
await hci_source.wait_for_termination()
on_dlc(session)
await websockets.serve(ws_server, port=8888)
if len(sys.argv) >= 5:
global source_file
source_file = open(sys.argv[4], 'rb')
# Skip header
source_file.seek(44)
await hci_transport.source.terminated
# -----------------------------------------------------------------------------

View File

@@ -37,7 +37,7 @@ hf_protocol: Optional[HfProtocol] = None
# -----------------------------------------------------------------------------
def on_dlc(dlc: rfcomm.DLC, configuration: hfp.Configuration):
def on_dlc(dlc: rfcomm.DLC, configuration: hfp.HfConfiguration):
print('*** DLC connected', dlc)
global hf_protocol
hf_protocol = HfProtocol(dlc, configuration)
@@ -84,19 +84,19 @@ def on_dlc(dlc: rfcomm.DLC, configuration: hfp.Configuration):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 3:
print('Usage: run_classic_hfp.py <device-config> <transport-spec>')
print('example: run_classic_hfp.py classic2.json usb:04b4:f901')
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
# Hands-Free profile configuration.
# TODO: load configuration from file.
configuration = hfp.Configuration(
configuration = hfp.HfConfiguration(
supported_hf_features=[
hfp.HfFeature.THREE_WAY_CALLING,
hfp.HfFeature.REMOTE_VOLUME_CONTROL,
@@ -116,7 +116,9 @@ async def main():
)
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True
# Create and register a server
@@ -128,7 +130,9 @@ async def main():
# Advertise the HFP RFComm channel in the SDP
device.sdp_service_records = {
0x00010001: hfp.sdp_records(0x00010001, channel_number, configuration)
0x00010001: hfp.make_hf_sdp_records(
0x00010001, channel_number, configuration
)
}
# Let's go!
@@ -164,7 +168,7 @@ async def main():
await websockets.serve(serve, 'localhost', 8989)
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -489,7 +489,7 @@ async def keyboard_device(hid_device):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: python run_hid_device.py <device-config> <transport-spec> <command>'
@@ -601,11 +601,13 @@ async def main():
asyncio.create_task(handle_virtual_cable_unplug())
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True
# Create and register HID device
@@ -742,7 +744,7 @@ async def main():
print("Executing in Web mode")
await keyboard_device(hid_device)
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -275,7 +275,7 @@ async def get_stream_reader(pipe) -> asyncio.StreamReader:
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 4:
print(
'Usage: run_hid_host.py <device-config> <transport-spec> '
@@ -324,11 +324,13 @@ async def main():
asyncio.create_task(handle_virtual_cable_unplug())
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< CONNECTED')
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True
# Create HID host and start it
@@ -557,7 +559,7 @@ async def main():
# Interrupt Channel
await hid_host.connect_interrupt_channel()
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -57,18 +57,20 @@ def on_my_characteristic_subscription(peer, enabled):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 3:
print('Usage: run_notifier.py <device-config> <transport-spec>')
print('example: run_notifier.py device1.json usb:0')
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device to manage the host
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.listener = Listener(device)
# Add a few entries to the device's GATT server

View File

@@ -165,7 +165,7 @@ async def tcp_server(tcp_port, rfcomm_session):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 5:
print(
'Usage: run_rfcomm_client.py <device-config> <transport-spec> '
@@ -178,11 +178,13 @@ async def main():
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True
await device.power_on()
@@ -192,8 +194,8 @@ async def main():
connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT)
print(f'=== Connected to {connection.peer_address}!')
channel = sys.argv[4]
if channel == 'discover':
channel_str = sys.argv[4]
if channel_str == 'discover':
await list_rfcomm_channels(connection)
return
@@ -213,7 +215,7 @@ async def main():
rfcomm_mux = await rfcomm_client.start()
print('@@@ Started')
channel = int(channel)
channel = int(channel_str)
print(f'### Opening session for channel {channel}...')
try:
session = await rfcomm_mux.open_dlc(channel)
@@ -229,7 +231,7 @@ async def main():
tcp_port = int(sys.argv[5])
asyncio.create_task(tcp_server(tcp_port, session))
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -107,7 +107,7 @@ class TcpServer:
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 4:
print(
'Usage: run_rfcomm_server.py <device-config> <transport-spec> '
@@ -124,11 +124,13 @@ async def main():
uuid = 'E6D55659-C8B4-4B85-96BB-B1143AF6D3AE'
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True
# Create a TCP server
@@ -153,7 +155,7 @@ async def main():
await device.set_discoverable(True)
await device.set_connectable(True)
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -20,27 +20,31 @@ import sys
import os
import logging
from bumble.colors import color
from bumble.hci import Address
from bumble.device import Device
from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 2:
print('Usage: run_scanner.py <transport-spec> [filter]')
print('example: run_scanner.py usb:0')
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[1]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[1]) as hci_transport:
print('<<< connected')
filter_duplicates = len(sys.argv) == 3 and sys.argv[2] == 'filter'
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
device = Device.with_hci(
'Bumble',
Address('F0:F1:F2:F3:F4:F5'),
hci_transport.source,
hci_transport.sink,
)
@device.on('advertisement')
def _(advertisement):
def on_adv(advertisement):
address_type_string = ('PUBLIC', 'RANDOM', 'PUBLIC_ID', 'RANDOM_ID')[
advertisement.address.address_type
]
@@ -67,10 +71,11 @@ async def main():
f'{advertisement.data.to_string(separator)}'
)
device.on('advertisement', on_adv)
await device.power_on()
await device.start_scanning(filter_duplicates=filter_duplicates)
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -33,7 +33,6 @@ include_package_data = True
install_requires =
aiohttp ~= 3.8; platform_system!='Emscripten'
appdirs >= 1.4; platform_system!='Emscripten'
bt-test-interfaces >= 0.0.6; platform_system!='Emscripten'
click >= 8.1.3; platform_system!='Emscripten'
cryptography == 39; platform_system!='Emscripten'
# Pyodide bundles a version of cryptography that is built for wasm, which may not match the
@@ -83,7 +82,7 @@ build =
build >= 0.7
test =
pytest >= 8.0
pytest-asyncio >= 0.21.1
pytest-asyncio >= 0.23.5
pytest-html >= 3.2.0
coverage >= 6.4
development =
@@ -100,6 +99,8 @@ development =
avatar =
pandora-avatar == 0.0.9
rootcanal == 1.10.0 ; python_version>='3.10'
pandora =
bt-test-interfaces >= 0.0.6
documentation =
mkdocs >= 1.4.0
mkdocs-material >= 8.5.6

View File

@@ -19,8 +19,9 @@ import asyncio
import logging
import os
import pytest
import pytest_asyncio
from typing import Tuple
from typing import Tuple, Optional
from .test_utils import TwoDevices
from bumble import core
@@ -35,10 +36,94 @@ from bumble import hci
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
def _default_hf_configuration() -> hfp.HfConfiguration:
return hfp.HfConfiguration(
supported_hf_features=[
hfp.HfFeature.CODEC_NEGOTIATION,
hfp.HfFeature.ESCO_S4_SETTINGS_SUPPORTED,
hfp.HfFeature.HF_INDICATORS,
hfp.HfFeature.ENHANCED_CALL_STATUS,
hfp.HfFeature.THREE_WAY_CALLING,
hfp.HfFeature.CLI_PRESENTATION_CAPABILITY,
],
supported_hf_indicators=[
hfp.HfIndicator.ENHANCED_SAFETY,
hfp.HfIndicator.BATTERY_LEVEL,
],
supported_audio_codecs=[
hfp.AudioCodec.CVSD,
hfp.AudioCodec.MSBC,
],
)
# -----------------------------------------------------------------------------
def _default_hf_sdp_features() -> hfp.HfSdpFeature:
return (
hfp.HfSdpFeature.WIDE_BAND
| hfp.HfSdpFeature.THREE_WAY_CALLING
| hfp.HfSdpFeature.CLI_PRESENTATION_CAPABILITY
)
# -----------------------------------------------------------------------------
def _default_ag_configuration() -> hfp.AgConfiguration:
return hfp.AgConfiguration(
supported_ag_features=[
hfp.AgFeature.HF_INDICATORS,
hfp.AgFeature.IN_BAND_RING_TONE_CAPABILITY,
hfp.AgFeature.REJECT_CALL,
hfp.AgFeature.CODEC_NEGOTIATION,
hfp.AgFeature.ESCO_S4_SETTINGS_SUPPORTED,
hfp.AgFeature.ENHANCED_CALL_STATUS,
hfp.AgFeature.THREE_WAY_CALLING,
],
supported_ag_indicators=[
hfp.AgIndicatorState.call(),
hfp.AgIndicatorState.service(),
hfp.AgIndicatorState.callsetup(),
hfp.AgIndicatorState.callsetup(),
hfp.AgIndicatorState.signal(),
hfp.AgIndicatorState.roam(),
hfp.AgIndicatorState.battchg(),
],
supported_hf_indicators=[
hfp.HfIndicator.ENHANCED_SAFETY,
hfp.HfIndicator.BATTERY_LEVEL,
],
supported_ag_call_hold_operations=[
hfp.CallHoldOperation.ADD_HELD_CALL,
hfp.CallHoldOperation.HOLD_ALL_ACTIVE_CALLS,
hfp.CallHoldOperation.HOLD_ALL_CALLS_EXCEPT,
hfp.CallHoldOperation.RELEASE_ALL_ACTIVE_CALLS,
hfp.CallHoldOperation.RELEASE_ALL_HELD_CALLS,
hfp.CallHoldOperation.RELEASE_SPECIFIC_CALL,
hfp.CallHoldOperation.CONNECT_TWO_CALLS,
],
supported_audio_codecs=[hfp.AudioCodec.CVSD, hfp.AudioCodec.MSBC],
)
# -----------------------------------------------------------------------------
def _default_ag_sdp_features() -> hfp.AgSdpFeature:
return (
hfp.AgSdpFeature.WIDE_BAND
| hfp.AgSdpFeature.IN_BAND_RING_TONE_CAPABILITY
| hfp.AgSdpFeature.THREE_WAY_CALLING
)
# -----------------------------------------------------------------------------
async def make_hfp_connections(
hf_config: hfp.Configuration,
) -> Tuple[hfp.HfProtocol, hfp.HfpProtocol]:
hf_config: Optional[hfp.HfConfiguration] = None,
ag_config: Optional[hfp.AgConfiguration] = None,
):
if not hf_config:
hf_config = _default_hf_configuration()
if not ag_config:
ag_config = _default_ag_configuration()
# Setup devices
devices = TwoDevices()
await devices.setup_connection()
@@ -55,38 +140,371 @@ async def make_hfp_connections(
# Setup HFP connection
hf = hfp.HfProtocol(client_dlc, hf_config)
ag = hfp.HfpProtocol(server_dlc)
return hf, ag
ag = hfp.AgProtocol(server_dlc, ag_config)
await hf.initiate_slc()
return (hf, ag)
# -----------------------------------------------------------------------------
@pytest_asyncio.fixture
async def hfp_connections():
hf, ag = await make_hfp_connections()
hf_loop_task = asyncio.create_task(hf.run())
try:
yield (hf, ag)
finally:
# Close the coroutine.
hf.unsolicited_queue.put_nowait(None)
await hf_loop_task
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_slc():
hf_config = hfp.Configuration(
supported_hf_features=[], supported_hf_indicators=[], supported_audio_codecs=[]
)
hf, ag = await make_hfp_connections(hf_config)
async def ag_loop():
while line := await ag.next_line():
if line.startswith('AT+BRSF'):
ag.send_response_line('+BRSF: 0')
elif line.startswith('AT+CIND=?'):
ag.send_response_line(
'+CIND: ("call",(0,1)),("callsetup",(0-3)),("service",(0-1)),'
'("signal",(0-5)),("roam",(0,1)),("battchg",(0-5)),'
'("callheld",(0-2))'
async def test_slc_with_minimal_features():
hf, ag = await make_hfp_connections(
hfp.HfConfiguration(
supported_audio_codecs=[],
supported_hf_features=[],
supported_hf_indicators=[],
),
hfp.AgConfiguration(
supported_ag_call_hold_operations=[],
supported_ag_features=[],
supported_ag_indicators=[
hfp.AgIndicatorState(
indicator=hfp.AgIndicator.CALL,
supported_values={0, 1},
current_status=0,
)
elif line.startswith('AT+CIND?'):
ag.send_response_line('+CIND: 0,0,1,4,1,5,0')
ag.send_response_line('OK')
],
supported_hf_indicators=[],
supported_audio_codecs=[],
),
)
ag_task = asyncio.create_task(ag_loop())
assert hf.supported_ag_features == ag.supported_ag_features
assert hf.supported_hf_features == ag.supported_hf_features
assert hf.supported_ag_call_hold_operations == ag.supported_ag_call_hold_operations
for a, b in zip(hf.ag_indicators, ag.ag_indicators):
assert a.indicator == b.indicator
assert a.current_status == b.current_status
await hf.initiate_slc()
ag_task.cancel()
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_slc(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
hf, ag = hfp_connections
assert hf.supported_ag_features == ag.supported_ag_features
assert hf.supported_hf_features == ag.supported_hf_features
assert hf.supported_ag_call_hold_operations == ag.supported_ag_call_hold_operations
for a, b in zip(hf.ag_indicators, ag.ag_indicators):
assert a.indicator == b.indicator
assert a.current_status == b.current_status
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_ag_indicator(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
hf, ag = hfp_connections
future = asyncio.get_running_loop().create_future()
hf.on('ag_indicator', future.set_result)
ag.update_ag_indicator(hfp.AgIndicator.CALL, 1)
indicator: hfp.AgIndicatorState = await future
assert indicator.current_status == 1
assert indicator.indicator == hfp.AgIndicator.CALL
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_hf_indicator(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
hf, ag = hfp_connections
future = asyncio.get_running_loop().create_future()
ag.on('hf_indicator', future.set_result)
await hf.execute_command('AT+BIEV=2,100')
indicator: hfp.HfIndicatorState = await future
assert indicator.current_status == 100
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_codec_negotiation(
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
):
hf, ag = hfp_connections
futures = [
asyncio.get_running_loop().create_future(),
asyncio.get_running_loop().create_future(),
]
hf.on('codec_negotiation', futures[0].set_result)
ag.on('codec_negotiation', futures[1].set_result)
await ag.negotiate_codec(hfp.AudioCodec.MSBC)
assert await futures[0] == await futures[1]
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_dial(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
hf, ag = hfp_connections
NUMBER = 'ATD123456789'
future = asyncio.get_running_loop().create_future()
ag.on('dial', future.set_result)
await hf.execute_command(f'ATD{NUMBER}')
number: str = await future
assert number == NUMBER
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_answer(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
hf, ag = hfp_connections
future = asyncio.get_running_loop().create_future()
ag.on('answer', lambda: future.set_result(None))
await hf.answer_incoming_call()
await future
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_reject_incoming_call(
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
):
hf, ag = hfp_connections
future = asyncio.get_running_loop().create_future()
ag.on('hang_up', lambda: future.set_result(None))
await hf.reject_incoming_call()
await future
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_terminate_call(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
hf, ag = hfp_connections
future = asyncio.get_running_loop().create_future()
ag.on('hang_up', lambda: future.set_result(None))
await hf.terminate_call()
await future
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_query_calls_without_calls(
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
):
hf, ag = hfp_connections
assert await hf.query_current_calls() == []
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_query_calls_with_calls(
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
):
hf, ag = hfp_connections
ag.calls.append(
hfp.CallInfo(
index=1,
direction=hfp.CallInfoDirection.MOBILE_ORIGINATED_CALL,
status=hfp.CallInfoStatus.ACTIVE,
mode=hfp.CallInfoMode.VOICE,
multi_party=hfp.CallInfoMultiParty.NOT_IN_CONFERENCE,
number='123456789',
)
)
assert await hf.query_current_calls() == ag.calls
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
@pytest.mark.parametrize(
"operation,",
(
hfp.CallHoldOperation.RELEASE_ALL_HELD_CALLS,
hfp.CallHoldOperation.RELEASE_ALL_ACTIVE_CALLS,
hfp.CallHoldOperation.HOLD_ALL_ACTIVE_CALLS,
hfp.CallHoldOperation.ADD_HELD_CALL,
hfp.CallHoldOperation.CONNECT_TWO_CALLS,
),
)
async def test_hold_call_without_call_index(
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol],
operation: hfp.CallHoldOperation,
):
hf, ag = hfp_connections
call_hold_future = asyncio.get_running_loop().create_future()
ag.on("call_hold", lambda op, index: call_hold_future.set_result((op, index)))
await hf.execute_command(f"AT+CHLD={operation.value}")
assert (await call_hold_future) == (operation, None)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
@pytest.mark.parametrize(
"operation,",
(
hfp.CallHoldOperation.RELEASE_SPECIFIC_CALL,
hfp.CallHoldOperation.HOLD_ALL_CALLS_EXCEPT,
),
)
async def test_hold_call_with_call_index(
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol],
operation: hfp.CallHoldOperation,
):
hf, ag = hfp_connections
call_hold_future = asyncio.get_running_loop().create_future()
ag.on("call_hold", lambda op, index: call_hold_future.set_result((op, index)))
ag.calls.append(
hfp.CallInfo(
index=1,
direction=hfp.CallInfoDirection.MOBILE_ORIGINATED_CALL,
status=hfp.CallInfoStatus.ACTIVE,
mode=hfp.CallInfoMode.VOICE,
multi_party=hfp.CallInfoMultiParty.NOT_IN_CONFERENCE,
number='123456789',
)
)
await hf.execute_command(f"AT+CHLD={operation.value.replace('x', '1')}")
assert (await call_hold_future) == (operation, 1)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_ring(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
hf, ag = hfp_connections
ring_future = asyncio.get_running_loop().create_future()
hf.on("ring", lambda: ring_future.set_result(None))
ag.send_ring()
await ring_future
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_speaker_volume(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
hf, ag = hfp_connections
speaker_volume_future = asyncio.get_running_loop().create_future()
hf.on("speaker_volume", speaker_volume_future.set_result)
ag.set_speaker_volume(10)
assert await speaker_volume_future == 10
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_microphone_volume(
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
):
hf, ag = hfp_connections
microphone_volume_future = asyncio.get_running_loop().create_future()
hf.on("microphone_volume", microphone_volume_future.set_result)
ag.set_microphone_volume(10)
assert await microphone_volume_future == 10
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_cli_notification(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
hf, ag = hfp_connections
cli_notification_future = asyncio.get_running_loop().create_future()
hf.on("cli_notification", cli_notification_future.set_result)
ag.send_cli_notification(
hfp.CallLineIdentification(number="\"123456789\"", type=129, alpha="\"Bumble\"")
)
assert await cli_notification_future == hfp.CallLineIdentification(
number="123456789", type=129, alpha="Bumble", subaddr="", satype=None
)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_voice_recognition_from_hf(
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
):
hf, ag = hfp_connections
voice_recognition_future = asyncio.get_running_loop().create_future()
ag.on("voice_recognition", voice_recognition_future.set_result)
await hf.execute_command("AT+BVRA=1")
assert await voice_recognition_future == hfp.VoiceRecognitionState.ENABLE
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_voice_recognition_from_ag(
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
):
hf, ag = hfp_connections
voice_recognition_future = asyncio.get_running_loop().create_future()
hf.on("voice_recognition", voice_recognition_future.set_result)
ag.send_response("+BVRA: 1")
assert await voice_recognition_future == hfp.VoiceRecognitionState.ENABLE
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_hf_sdp_record():
devices = TwoDevices()
await devices.setup_connection()
devices[0].sdp_service_records[1] = hfp.make_hf_sdp_records(
1, 2, _default_hf_configuration(), hfp.ProfileVersion.V1_8
)
assert await hfp.find_hf_sdp_record(devices.connections[1]) == (
2,
hfp.ProfileVersion.V1_8,
_default_hf_sdp_features(),
)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_ag_sdp_record():
devices = TwoDevices()
await devices.setup_connection()
devices[0].sdp_service_records[1] = hfp.make_ag_sdp_records(
1, 2, _default_ag_configuration(), hfp.ProfileVersion.V1_8
)
assert await hfp.find_ag_sdp_record(devices.connections[1]) == (
2,
hfp.ProfileVersion.V1_8,
_default_ag_sdp_features(),
)
# -----------------------------------------------------------------------------

View File

@@ -32,6 +32,8 @@ from bumble.rfcomm import (
RFCOMM_PSM,
)
_TIMEOUT = 0.1
# -----------------------------------------------------------------------------
def basic_frame_check(x):
@@ -82,6 +84,29 @@ async def test_basic_connection() -> None:
assert await queues[0].get() == b'Lorem ipsum dolor sit amet'
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_receive_pdu_before_open_dlc_returns() -> None:
devices = await test_utils.TwoDevices.create_with_connection()
DATA = b'123'
accept_future: asyncio.Future[DLC] = asyncio.get_running_loop().create_future()
channel = Server(devices[0]).listen(acceptor=accept_future.set_result)
assert devices.connections[1]
multiplexer = await Client(devices.connections[1]).start()
open_dlc_task = asyncio.create_task(multiplexer.open_dlc(channel))
dlc_responder = await accept_future
dlc_responder.write(DATA)
dlc_initiator = await open_dlc_task
dlc_initiator_queue = asyncio.Queue() # type: ignore[var-annotated]
dlc_initiator.sink = dlc_initiator_queue.put_nowait
assert await asyncio.wait_for(dlc_initiator_queue.get(), timeout=_TIMEOUT) == DATA
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_service_record():

View File

@@ -16,7 +16,8 @@
# Imports
# -----------------------------------------------------------------------------
import asyncio
from typing import List, Optional
from typing import List, Optional, Type
from typing_extensions import Self
from bumble.controller import Controller
from bumble.link import LocalLink
@@ -81,6 +82,12 @@ class TwoDevices:
def __getitem__(self, index: int) -> Device:
return self.devices[index]
@classmethod
async def create_with_connection(cls: Type[Self]) -> Self:
devices = cls()
await devices.setup_connection()
return devices
# -----------------------------------------------------------------------------
async def async_barrier():

View File

@@ -0,0 +1,64 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import os
import pytest
import socket
import unittest
from unittest.mock import ANY, patch
from bumble.transport.tcp_server import (
open_tcp_server_transport,
open_tcp_server_transport_with_socket,
)
class OpenTcpServerTransportTests(unittest.TestCase):
def setUp(self):
self.patcher = patch('bumble.transport.tcp_server._create_server')
self.mock_create_server = self.patcher.start()
def tearDown(self):
self.patcher.stop()
def test_open_with_spec(self):
asyncio.run(open_tcp_server_transport('localhost:32100'))
self.mock_create_server.assert_awaited_once_with(
ANY, host='localhost', port=32100
)
def test_open_with_port_only_spec(self):
asyncio.run(open_tcp_server_transport('_:32100'))
self.mock_create_server.assert_awaited_once_with(ANY, host=None, port=32100)
def test_open_with_socket(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
asyncio.run(open_tcp_server_transport_with_socket(sock=sock))
self.mock_create_server.assert_awaited_once_with(ANY, sock=sock)
@pytest.mark.skipif(
not os.environ.get('PYTEST_NOSKIP', 0),
reason='''\
Not hermetic. Should only run manually with
$ PYTEST_NOSKIP=1 pytest tests
''',
)
def test_open_with_real_socket():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.bind(('localhost', 0))
port = sock.getsockname()[1]
assert port != 0
asyncio.run(open_tcp_server_transport_with_socket(sock=sock))

View File

@@ -24,6 +24,11 @@ class PacketSource {
}
class PacketSink {
constructor() {
this.queue = [];
this.isProcessing = false;
}
on_packet(packet) {
if (!this.writer) {
return;
@@ -31,11 +36,24 @@ class PacketSink {
const buffer = packet.toJs({create_proxies : false});
packet.destroy();
//console.log(`HCI[host->controller]: ${bufferToHex(buffer)}`);
// TODO: create an async queue here instead of blindly calling write without awaiting
this.writer(buffer);
this.queue.push(buffer);
this.processQueue();
}
async processQueue() {
if (this.isProcessing) {
return;
}
this.isProcessing = true;
while (this.queue.length > 0) {
const buffer = this.queue.shift();
await this.writer(buffer);
}
this.isProcessing = false;
}
}
class LogEvent extends Event {
constructor(message) {
super('log');
@@ -185,4 +203,4 @@ export async function setupSimpleApp(appUrl, bumbleControls, log) {
bumbleControls.onBumbleLoaded();
return app;
}
}