Type hint all examples

This commit is contained in:
zxzxwu
2024-04-15 12:16:01 +00:00
parent 8758856e8c
commit 51a94288e2
34 changed files with 242 additions and 129 deletions

View File

@@ -90,6 +90,22 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Utils
# -----------------------------------------------------------------------------
def show_services(services: Iterable[ServiceProxy]) -> None:
for service in services:
print(color(str(service), 'cyan'))
for characteristic in service.characteristics:
print(color(' ' + str(characteristic), 'magenta'))
for descriptor in characteristic.descriptors:
print(color(' ' + str(descriptor), 'green'))
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Proxies # Proxies
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -184,7 +184,7 @@ class Host(AbortableEventEmitter):
self.long_term_key_provider = None self.long_term_key_provider = None
self.link_key_provider = None self.link_key_provider = None
self.pairing_io_capability_provider = None # Classic only self.pairing_io_capability_provider = None # Classic only
self.snooper = None self.snooper: Optional[Snooper] = None
# Connect to the source and sink if specified # Connect to the source and sink if specified
if controller_source: if controller_source:

View File

@@ -19,8 +19,8 @@
import struct import struct
from typing import Optional, Tuple from typing import Optional, Tuple
from ..gatt_client import ProfileServiceProxy from bumble.gatt_client import ServiceProxy, ProfileServiceProxy, CharacteristicProxy
from ..gatt import ( from bumble.gatt import (
GATT_DEVICE_INFORMATION_SERVICE, GATT_DEVICE_INFORMATION_SERVICE,
GATT_FIRMWARE_REVISION_STRING_CHARACTERISTIC, GATT_FIRMWARE_REVISION_STRING_CHARACTERISTIC,
GATT_HARDWARE_REVISION_STRING_CHARACTERISTIC, GATT_HARDWARE_REVISION_STRING_CHARACTERISTIC,
@@ -104,7 +104,16 @@ class DeviceInformationService(TemplateService):
class DeviceInformationServiceProxy(ProfileServiceProxy): class DeviceInformationServiceProxy(ProfileServiceProxy):
SERVICE_CLASS = DeviceInformationService SERVICE_CLASS = DeviceInformationService
def __init__(self, service_proxy): manufacturer_name: Optional[UTF8CharacteristicAdapter]
model_number: Optional[UTF8CharacteristicAdapter]
serial_number: Optional[UTF8CharacteristicAdapter]
hardware_revision: Optional[UTF8CharacteristicAdapter]
firmware_revision: Optional[UTF8CharacteristicAdapter]
software_revision: Optional[UTF8CharacteristicAdapter]
system_id: Optional[DelegatedCharacteristicAdapter]
ieee_regulatory_certification_data_list: Optional[CharacteristicProxy]
def __init__(self, service_proxy: ServiceProxy):
self.service_proxy = service_proxy self.service_proxy = service_proxy
for field, uuid in ( for field, uuid in (

View File

@@ -61,7 +61,7 @@ async def func4(x, y):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def main(): async def main() -> None:
print("MAIN: start, loop=", asyncio.get_running_loop()) print("MAIN: start, loop=", asyncio.get_running_loop())
print("MAIN: invoke func1") print("MAIN: invoke func1")
func1(1, 2) func1(1, 2)

View File

@@ -21,23 +21,29 @@ import os
import logging import logging
from bumble.colors import color from bumble.colors import color
from bumble.device import Device from bumble.device import Device
from bumble.hci import Address
from bumble.transport import open_transport from bumble.transport import open_transport
from bumble.profiles.battery_service import BatteryServiceProxy from bumble.profiles.battery_service import BatteryServiceProxy
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def main(): async def main() -> None:
if len(sys.argv) != 3: if len(sys.argv) != 3:
print('Usage: battery_client.py <transport-spec> <bluetooth-address>') print('Usage: battery_client.py <transport-spec> <bluetooth-address>')
print('example: battery_client.py usb:0 E1:CA:72:48:C4:E8') print('example: battery_client.py usb:0 E1:CA:72:48:C4:E8')
return return
print('<<< connecting to HCI...') print('<<< connecting to HCI...')
async with await open_transport(sys.argv[1]) as (hci_source, hci_sink): async with await open_transport(sys.argv[1]) as hci_transport:
print('<<< connected') print('<<< connected')
# Create and start a device # Create and start a device
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink) device = Device.with_hci(
'Bumble',
Address('F0:F1:F2:F3:F4:F5'),
hci_transport.source,
hci_transport.sink,
)
await device.power_on() await device.power_on()
# Connect to the peer # Connect to the peer

View File

@@ -29,14 +29,16 @@ from bumble.profiles.battery_service import BatteryService
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def main(): async def main() -> None:
if len(sys.argv) != 3: if len(sys.argv) != 3:
print('Usage: python battery_server.py <device-config> <transport-spec>') print('Usage: python battery_server.py <device-config> <transport-spec>')
print('example: python battery_server.py device1.json usb:0') print('example: python battery_server.py device1.json usb:0')
return return
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): async with await open_transport_or_link(sys.argv[2]) as hci_transport:
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
# Add a Battery Service to the GATT sever # Add a Battery Service to the GATT sever
battery_service = BatteryService(lambda _: random.randint(0, 100)) battery_service = BatteryService(lambda _: random.randint(0, 100))

View File

@@ -21,12 +21,13 @@ import os
import logging import logging
from bumble.colors import color from bumble.colors import color
from bumble.device import Device, Peer from bumble.device import Device, Peer
from bumble.hci import Address
from bumble.profiles.device_information_service import DeviceInformationServiceProxy from bumble.profiles.device_information_service import DeviceInformationServiceProxy
from bumble.transport import open_transport from bumble.transport import open_transport
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def main(): async def main() -> None:
if len(sys.argv) != 3: if len(sys.argv) != 3:
print( print(
'Usage: device_information_client.py <transport-spec> <bluetooth-address>' 'Usage: device_information_client.py <transport-spec> <bluetooth-address>'
@@ -35,11 +36,16 @@ async def main():
return return
print('<<< connecting to HCI...') print('<<< connecting to HCI...')
async with await open_transport(sys.argv[1]) as (hci_source, hci_sink): async with await open_transport(sys.argv[1]) as hci_transport:
print('<<< connected') print('<<< connected')
# Create and start a device # Create and start a device
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink) device = Device.with_hci(
'Bumble',
Address('F0:F1:F2:F3:F4:F5'),
hci_transport.source,
hci_transport.sink,
)
await device.power_on() await device.power_on()
# Connect to the peer # Connect to the peer

View File

@@ -28,14 +28,16 @@ from bumble.profiles.device_information_service import DeviceInformationService
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def main(): async def main() -> None:
if len(sys.argv) != 3: if len(sys.argv) != 3:
print('Usage: python device_info_server.py <device-config> <transport-spec>') print('Usage: python device_info_server.py <device-config> <transport-spec>')
print('example: python device_info_server.py device1.json usb:0') print('example: python device_info_server.py device1.json usb:0')
return return
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): async with await open_transport_or_link(sys.argv[2]) as hci_transport:
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
# Add a Device Information Service to the GATT sever # Add a Device Information Service to the GATT sever
device_information_service = DeviceInformationService( device_information_service = DeviceInformationService(
@@ -64,7 +66,7 @@ async def main():
# Go! # Go!
await device.power_on() await device.power_on()
await device.start_advertising(auto_restart=True) await device.start_advertising(auto_restart=True)
await hci_source.wait_for_termination() await hci_transport.source.wait_for_termination()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -21,23 +21,29 @@ import os
import logging import logging
from bumble.colors import color from bumble.colors import color
from bumble.device import Device from bumble.device import Device
from bumble.hci import Address
from bumble.transport import open_transport from bumble.transport import open_transport
from bumble.profiles.heart_rate_service import HeartRateServiceProxy from bumble.profiles.heart_rate_service import HeartRateServiceProxy
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def main(): async def main() -> None:
if len(sys.argv) != 3: if len(sys.argv) != 3:
print('Usage: heart_rate_client.py <transport-spec> <bluetooth-address>') print('Usage: heart_rate_client.py <transport-spec> <bluetooth-address>')
print('example: heart_rate_client.py usb:0 E1:CA:72:48:C4:E8') print('example: heart_rate_client.py usb:0 E1:CA:72:48:C4:E8')
return return
print('<<< connecting to HCI...') print('<<< connecting to HCI...')
async with await open_transport(sys.argv[1]) as (hci_source, hci_sink): async with await open_transport(sys.argv[1]) as hci_transport:
print('<<< connected') print('<<< connected')
# Create and start a device # Create and start a device
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink) device = Device.with_hci(
'Bumble',
Address('F0:F1:F2:F3:F4:F5'),
hci_transport.source,
hci_transport.sink,
)
await device.power_on() await device.power_on()
# Connect to the peer # Connect to the peer

View File

@@ -33,14 +33,16 @@ from bumble.utils import AsyncRunner
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def main(): async def main() -> None:
if len(sys.argv) != 3: if len(sys.argv) != 3:
print('Usage: python heart_rate_server.py <device-config> <transport-spec>') print('Usage: python heart_rate_server.py <device-config> <transport-spec>')
print('example: python heart_rate_server.py device1.json usb:0') print('example: python heart_rate_server.py device1.json usb:0')
return return
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): async with await open_transport_or_link(sys.argv[2]) as hci_transport:
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
# Keep track of accumulated expended energy # Keep track of accumulated expended energy
energy_start_time = time.time() energy_start_time = time.time()

View File

@@ -416,7 +416,7 @@ async def keyboard_device(device, command):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def main(): async def main() -> None:
if len(sys.argv) < 4: if len(sys.argv) < 4:
print( print(
'Usage: python keyboard.py <device-config> <transport-spec> <command>' 'Usage: python keyboard.py <device-config> <transport-spec> <command>'
@@ -434,9 +434,11 @@ async def main():
) )
return return
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): async with await open_transport_or_link(sys.argv[2]) as hci_transport:
# Create a device to manage the host # Create a device to manage the host
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
command = sys.argv[3] command = sys.argv[3]
if command == 'connect': if command == 'connect':

View File

@@ -139,18 +139,20 @@ async def find_a2dp_service(connection):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def main(): async def main() -> None:
if len(sys.argv) < 4: if len(sys.argv) < 4:
print('Usage: run_a2dp_info.py <device-config> <transport-spec> <bt-addr>') print('Usage: run_a2dp_info.py <device-config> <transport-spec> <bt-addr>')
print('example: run_a2dp_info.py classic1.json usb:0 14:7D:DA:4E:53:A8') print('example: run_a2dp_info.py classic1.json usb:0 14:7D:DA:4E:53:A8')
return return
print('<<< connecting to HCI...') print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected') print('<<< connected')
# Create a device # Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True device.classic_enabled = True
# Start the controller # Start the controller
@@ -187,7 +189,7 @@ async def main():
client = await AVDTP_Protocol.connect(connection, avdtp_version) client = await AVDTP_Protocol.connect(connection, avdtp_version)
# Discover all endpoints on the remote device # Discover all endpoints on the remote device
endpoints = await client.discover_remote_endpoints() endpoints = list(await client.discover_remote_endpoints())
print(f'@@@ Found {len(endpoints)} endpoints') print(f'@@@ Found {len(endpoints)} endpoints')
for endpoint in endpoints: for endpoint in endpoints:
print('@@@', endpoint) print('@@@', endpoint)

View File

@@ -19,6 +19,7 @@ import asyncio
import sys import sys
import os import os
import logging import logging
from typing import Any, Dict
from bumble.device import Device from bumble.device import Device
from bumble.transport import open_transport_or_link from bumble.transport import open_transport_or_link
@@ -41,7 +42,7 @@ from bumble.a2dp import (
SbcMediaCodecInformation, SbcMediaCodecInformation,
) )
Context = {'output': None} Context: Dict[Any, Any] = {'output': None}
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -104,7 +105,7 @@ def on_rtp_packet(packet):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def main(): async def main() -> None:
if len(sys.argv) < 4: if len(sys.argv) < 4:
print( print(
'Usage: run_a2dp_sink.py <device-config> <transport-spec> <sbc-file> ' 'Usage: run_a2dp_sink.py <device-config> <transport-spec> <sbc-file> '
@@ -114,14 +115,16 @@ async def main():
return return
print('<<< connecting to HCI...') print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected') print('<<< connected')
with open(sys.argv[3], 'wb') as sbc_file: with open(sys.argv[3], 'wb') as sbc_file:
Context['output'] = sbc_file Context['output'] = sbc_file
# Create a device # Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True device.classic_enabled = True
# Setup the SDP to expose the sink service # Setup the SDP to expose the sink service
@@ -162,7 +165,7 @@ async def main():
await device.set_discoverable(True) await device.set_discoverable(True)
await device.set_connectable(True) await device.set_connectable(True)
await hci_source.wait_for_termination() await hci_transport.source.wait_for_termination()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -114,7 +114,7 @@ async def stream_packets(read_function, protocol):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def main(): async def main() -> None:
if len(sys.argv) < 4: if len(sys.argv) < 4:
print( print(
'Usage: run_a2dp_source.py <device-config> <transport-spec> <sbc-file> ' 'Usage: run_a2dp_source.py <device-config> <transport-spec> <sbc-file> '
@@ -126,11 +126,13 @@ async def main():
return return
print('<<< connecting to HCI...') print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected') print('<<< connected')
# Create a device # Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True device.classic_enabled = True
# Setup the SDP to expose the SRC service # Setup the SDP to expose the SRC service
@@ -186,7 +188,7 @@ async def main():
await device.set_discoverable(True) await device.set_discoverable(True)
await device.set_connectable(True) await device.set_connectable(True)
await hci_source.wait_for_termination() await hci_transport.source.wait_for_termination()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -28,7 +28,7 @@ from bumble.transport import open_transport_or_link
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def main(): async def main() -> None:
if len(sys.argv) < 3: if len(sys.argv) < 3:
print( print(
'Usage: run_advertiser.py <config-file> <transport-spec> [type] [address]' 'Usage: run_advertiser.py <config-file> <transport-spec> [type] [address]'
@@ -50,10 +50,12 @@ async def main():
target = None target = None
print('<<< connecting to HCI...') print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected') print('<<< connected')
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
if advertising_type.is_scannable: if advertising_type.is_scannable:
device.scan_response_data = bytes( device.scan_response_data = bytes(
@@ -66,7 +68,7 @@ async def main():
await device.power_on() await device.power_on()
await device.start_advertising(advertising_type=advertising_type, target=target) await device.start_advertising(advertising_type=advertising_type, target=target)
await hci_source.wait_for_termination() await hci_transport.source.wait_for_termination()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -49,7 +49,7 @@ ASHA_LE_PSM_OUT_CHARACTERISTIC = UUID(
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def main(): async def main() -> None:
if len(sys.argv) != 4: if len(sys.argv) != 4:
print( print(
'Usage: python run_asha_sink.py <device-config> <transport-spec> ' 'Usage: python run_asha_sink.py <device-config> <transport-spec> '
@@ -60,8 +60,10 @@ async def main():
audio_out = open(sys.argv[3], 'wb') audio_out = open(sys.argv[3], 'wb')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): async with await open_transport_or_link(sys.argv[2]) as hci_transport:
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
# Handler for audio control commands # Handler for audio control commands
def on_audio_control_point_write(_connection, value): def on_audio_control_point_write(_connection, value):
@@ -197,7 +199,7 @@ async def main():
await device.power_on() await device.power_on()
await device.start_advertising(auto_restart=True) await device.start_advertising(auto_restart=True)
await hci_source.wait_for_termination() await hci_transport.source.wait_for_termination()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -331,7 +331,7 @@ class Delegate(avrcp.Delegate):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def main(): async def main() -> None:
if len(sys.argv) < 3: if len(sys.argv) < 3:
print( print(
'Usage: run_avrcp_controller.py <device-config> <transport-spec> ' 'Usage: run_avrcp_controller.py <device-config> <transport-spec> '
@@ -341,11 +341,13 @@ async def main():
return return
print('<<< connecting to HCI...') print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected') print('<<< connected')
# Create a device # Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True device.classic_enabled = True
# Setup the SDP to expose the sink service # Setup the SDP to expose the sink service

View File

@@ -32,7 +32,7 @@ from bumble.sdp import (
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def main(): async def main() -> None:
if len(sys.argv) < 3: if len(sys.argv) < 3:
print( print(
'Usage: run_classic_connect.py <device-config> <transport-spec> ' 'Usage: run_classic_connect.py <device-config> <transport-spec> '
@@ -42,11 +42,13 @@ async def main():
return return
print('<<< connecting to HCI...') print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected') print('<<< connected')
# Create a device # Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True device.classic_enabled = True
device.le_enabled = False device.le_enabled = False
await device.power_on() await device.power_on()

View File

@@ -91,18 +91,20 @@ SDP_SERVICE_RECORDS = {
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def main(): async def main() -> None:
if len(sys.argv) < 3: if len(sys.argv) < 3:
print('Usage: run_classic_discoverable.py <device-config> <transport-spec>') print('Usage: run_classic_discoverable.py <device-config> <transport-spec>')
print('example: run_classic_discoverable.py classic1.json usb:04b4:f901') print('example: run_classic_discoverable.py classic1.json usb:04b4:f901')
return return
print('<<< connecting to HCI...') print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected') print('<<< connected')
# Create a device # Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True device.classic_enabled = True
device.sdp_service_records = SDP_SERVICE_RECORDS device.sdp_service_records = SDP_SERVICE_RECORDS
await device.power_on() await device.power_on()
@@ -111,7 +113,7 @@ async def main():
await device.set_discoverable(True) await device.set_discoverable(True)
await device.set_connectable(True) await device.set_connectable(True)
await hci_source.wait_for_termination() await hci_transport.source.wait_for_termination()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -20,8 +20,8 @@ import sys
import os import os
import logging import logging
from bumble.colors import color from bumble.colors import color
from bumble.device import Device from bumble.device import Device
from bumble.hci import Address
from bumble.transport import open_transport_or_link from bumble.transport import open_transport_or_link
from bumble.core import DeviceClass from bumble.core import DeviceClass
@@ -53,22 +53,27 @@ class DiscoveryListener(Device.Listener):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def main(): async def main() -> None:
if len(sys.argv) != 2: if len(sys.argv) != 2:
print('Usage: run_classic_discovery.py <transport-spec>') print('Usage: run_classic_discovery.py <transport-spec>')
print('example: run_classic_discovery.py usb:04b4:f901') print('example: run_classic_discovery.py usb:04b4:f901')
return return
print('<<< connecting to HCI...') print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[1]) as (hci_source, hci_sink): async with await open_transport_or_link(sys.argv[1]) as hci_transport:
print('<<< connected') print('<<< connected')
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink) device = Device.with_hci(
'Bumble',
Address('F0:F1:F2:F3:F4:F5'),
hci_transport.source,
hci_transport.sink,
)
device.listener = DiscoveryListener() device.listener = DiscoveryListener()
await device.power_on() await device.power_on()
await device.start_discovery() await device.start_discovery()
await hci_source.wait_for_termination() await hci_transport.source.wait_for_termination()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -25,7 +25,7 @@ from bumble.transport import open_transport_or_link
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def main(): async def main() -> None:
if len(sys.argv) < 3: if len(sys.argv) < 3:
print( print(
'Usage: run_connect_and_encrypt.py <device-config> <transport-spec> ' 'Usage: run_connect_and_encrypt.py <device-config> <transport-spec> '
@@ -37,11 +37,13 @@ async def main():
return return
print('<<< connecting to HCI...') print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected') print('<<< connected')
# Create a device # Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
await device.power_on() await device.power_on()
# Connect to the peer # Connect to the peer
@@ -56,7 +58,7 @@ async def main():
print(f'!!! Encryption failed: {error}') print(f'!!! Encryption failed: {error}')
return return
await hci_source.wait_for_termination() await hci_transport.source.wait_for_termination()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -36,7 +36,7 @@ from bumble.transport import open_transport_or_link
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def main(): async def main() -> None:
if len(sys.argv) != 4: if len(sys.argv) != 4:
print( print(
'Usage: run_controller.py <controller-address> <device-config> ' 'Usage: run_controller.py <controller-address> <device-config> '
@@ -49,7 +49,7 @@ async def main():
return return
print('>>> connecting to HCI...') print('>>> connecting to HCI...')
async with await open_transport_or_link(sys.argv[3]) as (hci_source, hci_sink): async with await open_transport_or_link(sys.argv[3]) as hci_transport:
print('>>> connected') print('>>> connected')
# Create a local link # Create a local link
@@ -57,7 +57,10 @@ async def main():
# Create a first controller using the packet source/sink as its host interface # Create a first controller using the packet source/sink as its host interface
controller1 = Controller( controller1 = Controller(
'C1', host_source=hci_source, host_sink=hci_sink, link=link 'C1',
host_source=hci_transport.source,
host_sink=hci_transport.sink,
link=link,
) )
controller1.random_address = sys.argv[1] controller1.random_address = sys.argv[1]
@@ -98,7 +101,7 @@ async def main():
await device.start_advertising() await device.start_advertising()
await device.start_scanning() await device.start_scanning()
await hci_source.wait_for_termination() await hci_transport.source.wait_for_termination()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -20,9 +20,9 @@ import asyncio
import sys import sys
import os import os
from bumble.colors import color from bumble.colors import color
from bumble.device import Device from bumble.device import Device
from bumble.controller import Controller from bumble.controller import Controller
from bumble.hci import Address
from bumble.link import LocalLink from bumble.link import LocalLink
from bumble.transport import open_transport_or_link from bumble.transport import open_transport_or_link
@@ -45,14 +45,14 @@ class ScannerListener(Device.Listener):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def main(): async def main() -> None:
if len(sys.argv) != 2: if len(sys.argv) != 2:
print('Usage: run_controller.py <transport-spec>') print('Usage: run_controller.py <transport-spec>')
print('example: run_controller_with_scanner.py serial:/dev/pts/14,1000000') print('example: run_controller_with_scanner.py serial:/dev/pts/14,1000000')
return return
print('>>> connecting to HCI...') print('>>> connecting to HCI...')
async with await open_transport_or_link(sys.argv[1]) as (hci_source, hci_sink): async with await open_transport_or_link(sys.argv[1]) as hci_transport:
print('>>> connected') print('>>> connected')
# Create a local link # Create a local link
@@ -60,22 +60,25 @@ async def main():
# Create a first controller using the packet source/sink as its host interface # Create a first controller using the packet source/sink as its host interface
controller1 = Controller( controller1 = Controller(
'C1', host_source=hci_source, host_sink=hci_sink, link=link 'C1',
host_source=hci_transport.source,
host_sink=hci_transport.sink,
link=link,
public_address='E0:E1:E2:E3:E4:E5',
) )
controller1.address = 'E0:E1:E2:E3:E4:E5'
# Create a second controller using the same link # Create a second controller using the same link
controller2 = Controller('C2', link=link) controller2 = Controller('C2', link=link)
# Create a device with a scanner listener # Create a device with a scanner listener
device = Device.with_hci( device = Device.with_hci(
'Bumble', 'F0:F1:F2:F3:F4:F5', controller2, controller2 'Bumble', Address('F0:F1:F2:F3:F4:F5'), controller2, controller2
) )
device.listener = ScannerListener() device.listener = ScannerListener()
await device.power_on() await device.power_on()
await device.start_scanning() await device.start_scanning()
await hci_source.wait_for_termination() await hci_transport.source.wait_for_termination()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -20,31 +20,36 @@ import sys
import os import os
import logging import logging
from bumble.colors import color from bumble.colors import color
from bumble.hci import Address
from bumble.device import Device from bumble.device import Device
from bumble.transport import open_transport_or_link from bumble.transport import open_transport_or_link
from bumble.snoop import BtSnooper from bumble.snoop import BtSnooper
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def main(): async def main() -> None:
if len(sys.argv) != 3: if len(sys.argv) != 3:
print('Usage: run_device_with_snooper.py <transport-spec> <snoop-file>') print('Usage: run_device_with_snooper.py <transport-spec> <snoop-file>')
print('example: run_device_with_snooper.py usb:0 btsnoop.log') print('example: run_device_with_snooper.py usb:0 btsnoop.log')
return return
print('<<< connecting to HCI...') print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[1]) as (hci_source, hci_sink): async with await open_transport_or_link(sys.argv[1]) as hci_transport:
print('<<< connected') print('<<< connected')
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink) device = Device.with_hci(
'Bumble',
Address('F0:F1:F2:F3:F4:F5'),
hci_transport.source,
hci_transport.sink,
)
with open(sys.argv[2], "wb") as snoop_file: with open(sys.argv[2], "wb") as snoop_file:
device.host.snooper = BtSnooper(snoop_file) device.host.snooper = BtSnooper(snoop_file)
await device.power_on() await device.power_on()
await device.start_scanning() await device.start_scanning()
await hci_source.wait_for_termination() await hci_transport.source.wait_for_termination()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -69,7 +69,7 @@ class Listener(Device.Listener):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def main(): async def main() -> None:
if len(sys.argv) < 3: if len(sys.argv) < 3:
print( print(
'Usage: run_gatt_client.py <device-config> <transport-spec> ' 'Usage: run_gatt_client.py <device-config> <transport-spec> '
@@ -79,11 +79,13 @@ async def main():
return return
print('<<< connecting to HCI...') print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected') print('<<< connected')
# Create a device to manage the host, with a custom listener # Create a device to manage the host, with a custom listener
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.listener = Listener(device) device.listener = Listener(device)
await device.power_on() await device.power_on()

View File

@@ -19,21 +19,21 @@ import asyncio
import os import os
import logging import logging
from bumble.colors import color from bumble.colors import color
from bumble.core import ProtocolError from bumble.core import ProtocolError
from bumble.controller import Controller from bumble.controller import Controller
from bumble.device import Device, Peer from bumble.device import Device, Peer
from bumble.hci import Address
from bumble.host import Host from bumble.host import Host
from bumble.link import LocalLink from bumble.link import LocalLink
from bumble.gatt import ( from bumble.gatt import (
Service, Service,
Characteristic, Characteristic,
Descriptor, Descriptor,
show_services,
GATT_CHARACTERISTIC_USER_DESCRIPTION_DESCRIPTOR, GATT_CHARACTERISTIC_USER_DESCRIPTION_DESCRIPTOR,
GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC, GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
GATT_DEVICE_INFORMATION_SERVICE, GATT_DEVICE_INFORMATION_SERVICE,
) )
from bumble.gatt_client import show_services
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -43,7 +43,7 @@ class ServerListener(Device.Listener):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def main(): async def main() -> None:
# Create a local link # Create a local link
link = LocalLink() link = LocalLink()
@@ -51,14 +51,18 @@ async def main():
client_controller = Controller("client controller", link=link) client_controller = Controller("client controller", link=link)
client_host = Host() client_host = Host()
client_host.controller = client_controller client_host.controller = client_controller
client_device = Device("client", address='F0:F1:F2:F3:F4:F5', host=client_host) client_device = Device(
"client", address=Address('F0:F1:F2:F3:F4:F5'), host=client_host
)
await client_device.power_on() await client_device.power_on()
# Setup a stack for the server # Setup a stack for the server
server_controller = Controller("server controller", link=link) server_controller = Controller("server controller", link=link)
server_host = Host() server_host = Host()
server_host.controller = server_controller server_host.controller = server_controller
server_device = Device("server", address='F6:F7:F8:F9:FA:FB', host=server_host) server_device = Device(
"server", address=Address('F6:F7:F8:F9:FA:FB'), host=server_host
)
server_device.listener = ServerListener() server_device.listener = ServerListener()
await server_device.power_on() await server_device.power_on()

View File

@@ -71,7 +71,7 @@ def my_custom_write_with_error(connection, value):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def main(): async def main() -> None:
if len(sys.argv) < 3: if len(sys.argv) < 3:
print( print(
'Usage: run_gatt_server.py <device-config> <transport-spec> ' 'Usage: run_gatt_server.py <device-config> <transport-spec> '
@@ -81,11 +81,13 @@ async def main():
return return
print('<<< connecting to HCI...') print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected') print('<<< connected')
# Create a device to manage the host # Create a device to manage the host
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.listener = Listener(device) device.listener = Listener(device)
# Add a few entries to the device's GATT server # Add a few entries to the device's GATT server
@@ -146,7 +148,7 @@ async def main():
else: else:
await device.start_advertising(auto_restart=True) await device.start_advertising(auto_restart=True)
await hci_source.wait_for_termination() await hci_transport.source.wait_for_termination()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -84,14 +84,14 @@ def on_dlc(dlc: rfcomm.DLC, configuration: hfp.HfConfiguration):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def main(): async def main() -> None:
if len(sys.argv) < 3: if len(sys.argv) < 3:
print('Usage: run_classic_hfp.py <device-config> <transport-spec>') print('Usage: run_classic_hfp.py <device-config> <transport-spec>')
print('example: run_classic_hfp.py classic2.json usb:04b4:f901') print('example: run_classic_hfp.py classic2.json usb:04b4:f901')
return return
print('<<< connecting to HCI...') print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected') print('<<< connected')
# Hands-Free profile configuration. # Hands-Free profile configuration.
@@ -116,7 +116,9 @@ async def main():
) )
# Create a device # Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True device.classic_enabled = True
# Create and register a server # Create and register a server
@@ -128,7 +130,9 @@ async def main():
# Advertise the HFP RFComm channel in the SDP # Advertise the HFP RFComm channel in the SDP
device.sdp_service_records = { device.sdp_service_records = {
0x00010001: hfp.sdp_records(0x00010001, channel_number, configuration) 0x00010001: hfp.make_hf_sdp_records(
0x00010001, channel_number, configuration
)
} }
# Let's go! # Let's go!
@@ -164,7 +168,7 @@ async def main():
await websockets.serve(serve, 'localhost', 8989) await websockets.serve(serve, 'localhost', 8989)
await hci_source.wait_for_termination() await hci_transport.source.wait_for_termination()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -489,7 +489,7 @@ async def keyboard_device(hid_device):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def main(): async def main() -> None:
if len(sys.argv) < 3: if len(sys.argv) < 3:
print( print(
'Usage: python run_hid_device.py <device-config> <transport-spec> <command>' 'Usage: python run_hid_device.py <device-config> <transport-spec> <command>'
@@ -601,11 +601,13 @@ async def main():
asyncio.create_task(handle_virtual_cable_unplug()) asyncio.create_task(handle_virtual_cable_unplug())
print('<<< connecting to HCI...') print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected') print('<<< connected')
# Create a device # Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True device.classic_enabled = True
# Create and register HID device # Create and register HID device
@@ -742,7 +744,7 @@ async def main():
print("Executing in Web mode") print("Executing in Web mode")
await keyboard_device(hid_device) await keyboard_device(hid_device)
await hci_source.wait_for_termination() await hci_transport.source.wait_for_termination()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -275,7 +275,7 @@ async def get_stream_reader(pipe) -> asyncio.StreamReader:
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def main(): async def main() -> None:
if len(sys.argv) < 4: if len(sys.argv) < 4:
print( print(
'Usage: run_hid_host.py <device-config> <transport-spec> ' 'Usage: run_hid_host.py <device-config> <transport-spec> '
@@ -324,11 +324,13 @@ async def main():
asyncio.create_task(handle_virtual_cable_unplug()) asyncio.create_task(handle_virtual_cable_unplug())
print('<<< connecting to HCI...') print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< CONNECTED') print('<<< CONNECTED')
# Create a device # Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True device.classic_enabled = True
# Create HID host and start it # Create HID host and start it
@@ -557,7 +559,7 @@ async def main():
# Interrupt Channel # Interrupt Channel
await hid_host.connect_interrupt_channel() await hid_host.connect_interrupt_channel()
await hci_source.wait_for_termination() await hci_transport.source.wait_for_termination()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -57,18 +57,20 @@ def on_my_characteristic_subscription(peer, enabled):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def main(): async def main() -> None:
if len(sys.argv) < 3: if len(sys.argv) < 3:
print('Usage: run_notifier.py <device-config> <transport-spec>') print('Usage: run_notifier.py <device-config> <transport-spec>')
print('example: run_notifier.py device1.json usb:0') print('example: run_notifier.py device1.json usb:0')
return return
print('<<< connecting to HCI...') print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected') print('<<< connected')
# Create a device to manage the host # Create a device to manage the host
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.listener = Listener(device) device.listener = Listener(device)
# Add a few entries to the device's GATT server # Add a few entries to the device's GATT server

View File

@@ -165,7 +165,7 @@ async def tcp_server(tcp_port, rfcomm_session):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def main(): async def main() -> None:
if len(sys.argv) < 5: if len(sys.argv) < 5:
print( print(
'Usage: run_rfcomm_client.py <device-config> <transport-spec> ' 'Usage: run_rfcomm_client.py <device-config> <transport-spec> '
@@ -178,11 +178,13 @@ async def main():
return return
print('<<< connecting to HCI...') print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected') print('<<< connected')
# Create a device # Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True device.classic_enabled = True
await device.power_on() await device.power_on()
@@ -192,8 +194,8 @@ async def main():
connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT) connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT)
print(f'=== Connected to {connection.peer_address}!') print(f'=== Connected to {connection.peer_address}!')
channel = sys.argv[4] channel_str = sys.argv[4]
if channel == 'discover': if channel_str == 'discover':
await list_rfcomm_channels(connection) await list_rfcomm_channels(connection)
return return
@@ -213,7 +215,7 @@ async def main():
rfcomm_mux = await rfcomm_client.start() rfcomm_mux = await rfcomm_client.start()
print('@@@ Started') print('@@@ Started')
channel = int(channel) channel = int(channel_str)
print(f'### Opening session for channel {channel}...') print(f'### Opening session for channel {channel}...')
try: try:
session = await rfcomm_mux.open_dlc(channel) session = await rfcomm_mux.open_dlc(channel)
@@ -229,7 +231,7 @@ async def main():
tcp_port = int(sys.argv[5]) tcp_port = int(sys.argv[5])
asyncio.create_task(tcp_server(tcp_port, session)) asyncio.create_task(tcp_server(tcp_port, session))
await hci_source.wait_for_termination() await hci_transport.source.wait_for_termination()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -107,7 +107,7 @@ class TcpServer:
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def main(): async def main() -> None:
if len(sys.argv) < 4: if len(sys.argv) < 4:
print( print(
'Usage: run_rfcomm_server.py <device-config> <transport-spec> ' 'Usage: run_rfcomm_server.py <device-config> <transport-spec> '
@@ -124,11 +124,13 @@ async def main():
uuid = 'E6D55659-C8B4-4B85-96BB-B1143AF6D3AE' uuid = 'E6D55659-C8B4-4B85-96BB-B1143AF6D3AE'
print('<<< connecting to HCI...') print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected') print('<<< connected')
# Create a device # Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True device.classic_enabled = True
# Create a TCP server # Create a TCP server
@@ -153,7 +155,7 @@ async def main():
await device.set_discoverable(True) await device.set_discoverable(True)
await device.set_connectable(True) await device.set_connectable(True)
await hci_source.wait_for_termination() await hci_transport.source.wait_for_termination()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -20,27 +20,31 @@ import sys
import os import os
import logging import logging
from bumble.colors import color from bumble.colors import color
from bumble.hci import Address
from bumble.device import Device from bumble.device import Device
from bumble.transport import open_transport_or_link from bumble.transport import open_transport_or_link
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def main(): async def main() -> None:
if len(sys.argv) < 2: if len(sys.argv) < 2:
print('Usage: run_scanner.py <transport-spec> [filter]') print('Usage: run_scanner.py <transport-spec> [filter]')
print('example: run_scanner.py usb:0') print('example: run_scanner.py usb:0')
return return
print('<<< connecting to HCI...') print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[1]) as (hci_source, hci_sink): async with await open_transport_or_link(sys.argv[1]) as hci_transport:
print('<<< connected') print('<<< connected')
filter_duplicates = len(sys.argv) == 3 and sys.argv[2] == 'filter' filter_duplicates = len(sys.argv) == 3 and sys.argv[2] == 'filter'
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink) device = Device.with_hci(
'Bumble',
Address('F0:F1:F2:F3:F4:F5'),
hci_transport.source,
hci_transport.sink,
)
@device.on('advertisement') def on_adv(advertisement):
def _(advertisement):
address_type_string = ('PUBLIC', 'RANDOM', 'PUBLIC_ID', 'RANDOM_ID')[ address_type_string = ('PUBLIC', 'RANDOM', 'PUBLIC_ID', 'RANDOM_ID')[
advertisement.address.address_type advertisement.address.address_type
] ]
@@ -67,10 +71,11 @@ async def main():
f'{advertisement.data.to_string(separator)}' f'{advertisement.data.to_string(separator)}'
) )
device.on('advertisement', on_adv)
await device.power_on() await device.power_on()
await device.start_scanning(filter_duplicates=filter_duplicates) await device.start_scanning(filter_duplicates=filter_duplicates)
await hci_source.wait_for_termination() await hci_transport.source.wait_for_termination()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------