Compare commits

..

2 Commits

Author SHA1 Message Date
uael 412fd0f78a pandora: implement L2CAP pandora service
Co-authored-by: Josh Wu <joshwu@google.com>
2023-11-07 00:58:33 -08:00
uael ee494a6543 l2cap: refactor server side to allow deferred accept
In order to avoid any breaking changes this re-impl current APIs with
the exact same behavior.

The previous impl was preventing one to defer the response to an l2cap
channel connection request, both for BR/EDR basic channels and LE credit
based ones. This commit change this to spawn a task on every channel
incoming connection request, then all registered listeners are given a
chance to accept it through a `asyncio.Future`. After a bit of delay, if
none had accepted it, the connection is automatically rejected.
2023-11-07 00:43:02 -08:00
118 changed files with 1501 additions and 6006 deletions
+3 -5
View File
@@ -56,7 +56,7 @@ jobs:
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Install Python dependencies
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install ".[build,test,development,documentation]"
@@ -65,17 +65,15 @@ jobs:
with:
components: clippy,rustfmt
toolchain: ${{ matrix.rust-version }}
- name: Install Rust dependencies
run: cargo install cargo-all-features # allows building/testing combinations of features
- name: Check License Headers
run: cd rust && cargo run --features dev-tools --bin file-header check-all
- name: Rust Build
run: cd rust && cargo build --all-targets && cargo build-all-features --all-targets
run: cd rust && cargo build --all-targets && cargo build --all-features --all-targets
# Lints after build so what clippy needs is already built
- name: Rust Lints
run: cd rust && cargo fmt --check && cargo clippy --all-targets -- --deny warnings && cargo clippy --all-features --all-targets -- --deny warnings
- name: Rust Tests
run: cd rust && cargo test-all-features
run: cd rust && cargo test
# At some point, hook up publishing the binary. For now, just make sure it builds.
# Once we're ready to publish binaries, this should be built with `--release`.
- name: Build Bumble CLI
-6
View File
@@ -21,9 +21,7 @@
"cccds",
"cmac",
"CONNECTIONLESS",
"csip",
"csrcs",
"CVSD",
"datagram",
"DATALINK",
"delayreport",
@@ -31,7 +29,6 @@
"deregistration",
"dhkey",
"diversifier",
"endianness",
"Fitbit",
"GATTLINK",
"HANDSFREE",
@@ -41,14 +38,12 @@
"libc",
"libusb",
"MITM",
"MSBC",
"NDIS",
"netsim",
"NONBLOCK",
"NONCONN",
"OXIMETER",
"popleft",
"PRAND",
"protobuf",
"psms",
"pyee",
@@ -60,7 +55,6 @@
"SEID",
"seids",
"SERV",
"SIRK",
"ssrc",
"strerror",
"subband",
+24 -180
View File
@@ -50,10 +50,8 @@ from bumble.sdp import (
SDP_PUBLIC_BROWSE_ROOT,
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
DataElement,
ServiceAttribute,
Client as SdpClient,
)
from bumble.transport import open_transport_or_link
import bumble.rfcomm
@@ -79,7 +77,6 @@ SPEED_SERVICE_UUID = '50DB505C-8AC4-4738-8448-3B1D9CC09CC5'
SPEED_TX_UUID = 'E789C754-41A1-45F4-A948-A0A1A90DBA53'
SPEED_RX_UUID = '016A2CC7-E14B-4819-935F-1F56EAE4098D'
DEFAULT_RFCOMM_UUID = 'E6D55659-C8B4-4B85-96BB-B1143AF6D3AE'
DEFAULT_L2CAP_PSM = 1234
DEFAULT_L2CAP_MAX_CREDITS = 128
DEFAULT_L2CAP_MTU = 1022
@@ -131,16 +128,11 @@ def print_connection(connection):
if connection.transport == BT_LE_TRANSPORT:
phy_state = (
'PHY='
f'TX:{le_phy_name(connection.phy.tx_phy)}/'
f'RX:{le_phy_name(connection.phy.rx_phy)}'
f'RX:{le_phy_name(connection.phy.rx_phy)}/'
f'TX:{le_phy_name(connection.phy.tx_phy)}'
)
data_length = (
'DL=('
f'TX:{connection.data_length[0]}/{connection.data_length[1]},'
f'RX:{connection.data_length[2]}/{connection.data_length[3]}'
')'
)
data_length = f'DL={connection.data_length}'
connection_parameters = (
'Parameters='
f'{connection.parameters.connection_interval * 1.25:.2f}/'
@@ -177,7 +169,9 @@ def make_sdp_records(channel):
),
ServiceAttribute(
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
DataElement.sequence([DataElement.uuid(UUID(DEFAULT_RFCOMM_UUID))]),
DataElement.sequence(
[DataElement.uuid(UUID('E6D55659-C8B4-4B85-96BB-B1143AF6D3AE'))]
),
),
ServiceAttribute(
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
@@ -197,48 +191,6 @@ def make_sdp_records(channel):
}
async def find_rfcomm_channel_with_uuid(connection: Connection, uuid: str) -> int:
# Connect to the SDP Server
sdp_client = SdpClient(connection)
await sdp_client.connect()
# Search for services with an L2CAP service attribute
search_result = await sdp_client.search_attributes(
[BT_L2CAP_PROTOCOL_ID],
[
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
],
)
for attribute_list in search_result:
service_uuid = None
service_class_id_list = ServiceAttribute.find_attribute_in_list(
attribute_list, SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID
)
if service_class_id_list:
if service_class_id_list.value:
for service_class_id in service_class_id_list.value:
service_uuid = service_class_id.value
if str(service_uuid) != uuid:
# This service doesn't have a UUID or isn't the right one.
continue
# Look for the RFCOMM Channel number
protocol_descriptor_list = ServiceAttribute.find_attribute_in_list(
attribute_list, SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID
)
if protocol_descriptor_list:
for protocol_descriptor in protocol_descriptor_list.value:
if len(protocol_descriptor.value) >= 2:
if protocol_descriptor.value[0].value == BT_RFCOMM_PROTOCOL_ID:
await sdp_client.disconnect()
return protocol_descriptor.value[1].value
await sdp_client.disconnect()
return 0
class PacketType(enum.IntEnum):
RESET = 0
SEQUENCE = 1
@@ -272,7 +224,7 @@ class Sender:
if self.tx_start_delay:
print(color(f'*** Startup delay: {self.tx_start_delay}', 'blue'))
await asyncio.sleep(self.tx_start_delay)
await asyncio.sleep(self.tx_start_delay) # FIXME
print(color('=== Sending RESET', 'magenta'))
await self.packet_io.send_packet(bytes([PacketType.RESET]))
@@ -412,7 +364,7 @@ class Ping:
if self.tx_start_delay:
print(color(f'*** Startup delay: {self.tx_start_delay}', 'blue'))
await asyncio.sleep(self.tx_start_delay)
await asyncio.sleep(self.tx_start_delay) # FIXME
print(color('=== Sending RESET', 'magenta'))
await self.packet_io.send_packet(bytes([PacketType.RESET]))
@@ -758,14 +710,14 @@ class L2capServer(StreamedPacketIO):
self.l2cap_channel = None
self.ready = asyncio.Event()
# Listen for incoming L2CAP connections
# Listen for incoming L2CAP CoC connections
device.create_l2cap_server(
spec=l2cap.LeCreditBasedChannelSpec(
psm=psm, mtu=mtu, mps=mps, max_credits=max_credits
),
handler=self.on_l2cap_channel,
)
print(color(f'### Listening for L2CAP connection on PSM {psm}', 'yellow'))
print(color(f'### Listening for CoC connection on PSM {psm}', 'yellow'))
async def on_connection(self, connection):
connection.on('disconnection', self.on_disconnection)
@@ -791,35 +743,21 @@ class L2capServer(StreamedPacketIO):
# RfcommClient
# -----------------------------------------------------------------------------
class RfcommClient(StreamedPacketIO):
def __init__(self, device, channel, uuid):
def __init__(self, device):
super().__init__()
self.device = device
self.channel = channel
self.uuid = uuid
self.ready = asyncio.Event()
async def on_connection(self, connection):
connection.on('disconnection', self.on_disconnection)
# Find the channel number if not specified
channel = self.channel
if channel == 0:
print(
color(f'@@@ Discovering channel number from UUID {self.uuid}', 'cyan')
)
channel = await find_rfcomm_channel_with_uuid(connection, self.uuid)
print(color(f'@@@ Channel number = {channel}', 'cyan'))
if channel == 0:
print(color('!!! No RFComm service with this UUID found', 'red'))
await connection.disconnect()
return
# Create a client and start it
print(color('*** Starting RFCOMM client...', 'blue'))
rfcomm_client = bumble.rfcomm.Client(connection)
rfcomm_client = bumble.rfcomm.Client(self.device, connection)
rfcomm_mux = await rfcomm_client.start()
print(color('*** Started', 'blue'))
channel = DEFAULT_RFCOMM_CHANNEL
print(color(f'### Opening session for channel {channel}...', 'yellow'))
try:
rfcomm_session = await rfcomm_mux.open_dlc(channel)
@@ -842,7 +780,7 @@ class RfcommClient(StreamedPacketIO):
# RfcommServer
# -----------------------------------------------------------------------------
class RfcommServer(StreamedPacketIO):
def __init__(self, device, channel):
def __init__(self, device):
super().__init__()
self.ready = asyncio.Event()
@@ -850,7 +788,7 @@ class RfcommServer(StreamedPacketIO):
rfcomm_server = bumble.rfcomm.Server(device)
# Listen for incoming DLC connections
channel_number = rfcomm_server.listen(self.on_dlc, channel)
channel_number = rfcomm_server.listen(self.on_dlc, DEFAULT_RFCOMM_CHANNEL)
# Setup the SDP to advertise this channel
device.sdp_service_records = make_sdp_records(channel_number)
@@ -887,9 +825,6 @@ class Central(Connection.Listener):
mode_factory,
connection_interval,
phy,
authenticate,
encrypt,
extended_data_length,
):
super().__init__()
self.transport = transport
@@ -897,9 +832,6 @@ class Central(Connection.Listener):
self.classic = classic
self.role_factory = role_factory
self.mode_factory = mode_factory
self.authenticate = authenticate
self.encrypt = encrypt or authenticate
self.extended_data_length = extended_data_length
self.device = None
self.connection = None
@@ -972,26 +904,7 @@ class Central(Connection.Listener):
self.connection.listener = self
print_connection(self.connection)
# Request a new data length if requested
if self.extended_data_length:
print(color('+++ Requesting extended data length', 'cyan'))
await self.connection.set_data_length(
self.extended_data_length[0], self.extended_data_length[1]
)
# Authenticate if requested
if self.authenticate:
# Request authentication
print(color('*** Authenticating...', 'cyan'))
await self.connection.authenticate()
print(color('*** Authenticated', 'cyan'))
# Encrypt if requested
if self.encrypt:
# Enable encryption
print(color('*** Enabling encryption...', 'cyan'))
await self.connection.encrypt()
print(color('*** Encryption on', 'cyan'))
await mode.on_connection(self.connection)
# Set the PHY if requested
if self.phy is not None:
@@ -1006,8 +919,6 @@ class Central(Connection.Listener):
)
)
await mode.on_connection(self.connection)
await role.run()
await asyncio.sleep(DEFAULT_LINGER_TIME)
@@ -1032,12 +943,9 @@ class Central(Connection.Listener):
# Peripheral
# -----------------------------------------------------------------------------
class Peripheral(Device.Listener, Connection.Listener):
def __init__(
self, transport, classic, extended_data_length, role_factory, mode_factory
):
def __init__(self, transport, classic, role_factory, mode_factory):
self.transport = transport
self.classic = classic
self.extended_data_length = extended_data_length
self.role_factory = role_factory
self.role = None
self.mode_factory = mode_factory
@@ -1098,15 +1006,6 @@ class Peripheral(Device.Listener, Connection.Listener):
self.connection = connection
self.connected.set()
# Request a new data length if needed
if self.extended_data_length:
print("+++ Requesting extended data length")
AsyncRunner.spawn(
connection.set_data_length(
self.extended_data_length[0], self.extended_data_length[1]
)
)
def on_disconnection(self, reason):
print(color(f'!!! Disconnection: reason={reason}', 'red'))
self.connection = None
@@ -1139,18 +1038,16 @@ def create_mode_factory(ctx, default_mode):
return GattServer(device)
if mode == 'l2cap-client':
return L2capClient(device, psm=ctx.obj['l2cap_psm'])
return L2capClient(device)
if mode == 'l2cap-server':
return L2capServer(device, psm=ctx.obj['l2cap_psm'])
return L2capServer(device)
if mode == 'rfcomm-client':
return RfcommClient(
device, channel=ctx.obj['rfcomm_channel'], uuid=ctx.obj['rfcomm_uuid']
)
return RfcommClient(device)
if mode == 'rfcomm-server':
return RfcommServer(device, channel=ctx.obj['rfcomm_channel'])
return RfcommServer(device)
raise ValueError('invalid mode')
@@ -1216,27 +1113,6 @@ def create_role_factory(ctx, default_role):
type=click.IntRange(23, 517),
help='GATT MTU (gatt-client mode)',
)
@click.option(
'--extended-data-length',
help='Request a data length upon connection, specified as tx_octets/tx_time',
)
@click.option(
'--rfcomm-channel',
type=int,
default=DEFAULT_RFCOMM_CHANNEL,
help='RFComm channel to use',
)
@click.option(
'--rfcomm-uuid',
default=DEFAULT_RFCOMM_UUID,
help='RFComm service UUID to use (ignored is --rfcomm-channel is not 0)',
)
@click.option(
'--l2cap-psm',
type=int,
default=DEFAULT_L2CAP_PSM,
help='L2CAP PSM to use',
)
@click.option(
'--packet-size',
'-s',
@@ -1263,36 +1139,17 @@ def create_role_factory(ctx, default_role):
)
@click.pass_context
def bench(
ctx,
device_config,
role,
mode,
att_mtu,
extended_data_length,
packet_size,
packet_count,
start_delay,
rfcomm_channel,
rfcomm_uuid,
l2cap_psm,
ctx, device_config, role, mode, att_mtu, packet_size, packet_count, start_delay
):
ctx.ensure_object(dict)
ctx.obj['device_config'] = device_config
ctx.obj['role'] = role
ctx.obj['mode'] = mode
ctx.obj['att_mtu'] = att_mtu
ctx.obj['rfcomm_channel'] = rfcomm_channel
ctx.obj['rfcomm_uuid'] = rfcomm_uuid
ctx.obj['l2cap_psm'] = l2cap_psm
ctx.obj['packet_size'] = packet_size
ctx.obj['packet_count'] = packet_count
ctx.obj['start_delay'] = start_delay
ctx.obj['extended_data_length'] = (
[int(x) for x in extended_data_length.split('/')]
if extended_data_length
else None
)
ctx.obj['classic'] = mode in ('rfcomm-client', 'rfcomm-server')
@@ -1313,12 +1170,8 @@ def bench(
help='Connection interval (in ms)',
)
@click.option('--phy', type=click.Choice(['1m', '2m', 'coded']), help='PHY to use')
@click.option('--authenticate', is_flag=True, help='Authenticate (RFComm only)')
@click.option('--encrypt', is_flag=True, help='Encrypt the connection (RFComm only)')
@click.pass_context
def central(
ctx, transport, peripheral_address, connection_interval, phy, authenticate, encrypt
):
def central(ctx, transport, peripheral_address, connection_interval, phy):
"""Run as a central (initiates the connection)"""
role_factory = create_role_factory(ctx, 'sender')
mode_factory = create_mode_factory(ctx, 'gatt-client')
@@ -1333,9 +1186,6 @@ def central(
mode_factory,
connection_interval,
phy,
authenticate,
encrypt or authenticate,
ctx.obj['extended_data_length'],
).run()
)
@@ -1349,13 +1199,7 @@ def peripheral(ctx, transport):
mode_factory = create_mode_factory(ctx, 'gatt-server')
asyncio.run(
Peripheral(
transport,
ctx.obj['classic'],
ctx.obj['extended_data_length'],
role_factory,
mode_factory,
).run()
Peripheral(transport, ctx.obj['classic'], role_factory, mode_factory).run()
)
-14
View File
@@ -42,8 +42,6 @@ from bumble.hci import (
HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command,
HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND,
HCI_LE_Read_Maximum_Advertising_Data_Length_Command,
HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND,
HCI_LE_Read_Suggested_Default_Data_Length_Command,
)
from bumble.host import Host
from bumble.transport import open_transport_or_link
@@ -119,18 +117,6 @@ async def get_le_info(host):
'\n',
)
if host.supports_command(HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND):
response = await host.send_command(
HCI_LE_Read_Suggested_Default_Data_Length_Command()
)
if command_succeeded(response):
print(
color('Suggested Default Data Length:', 'yellow'),
f'{response.return_parameters.suggested_max_tx_octets}/'
f'{response.return_parameters.suggested_max_tx_time}',
'\n',
)
print(color('LE Features:', 'yellow'))
for feature in host.supported_le_features:
print(' ', name_or_number(HCI_LE_SUPPORTED_FEATURES_NAMES, feature))
+7 -66
View File
@@ -24,16 +24,10 @@ from prompt_toolkit.shortcuts import PromptSession
from bumble.colors import color
from bumble.device import Device, Peer
from bumble.transport import open_transport_or_link
from bumble.pairing import OobData, PairingDelegate, PairingConfig
from bumble.smp import OobContext, OobLegacyContext
from bumble.pairing import PairingDelegate, PairingConfig
from bumble.smp import error_name as smp_error_name
from bumble.keys import JsonKeyStore
from bumble.core import (
AdvertisingData,
ProtocolError,
BT_LE_TRANSPORT,
BT_BR_EDR_TRANSPORT,
)
from bumble.core import ProtocolError
from bumble.gatt import (
GATT_DEVICE_NAME_CHARACTERISTIC,
GATT_GENERIC_ACCESS_SERVICE,
@@ -66,7 +60,7 @@ class Waiter:
class Delegate(PairingDelegate):
def __init__(self, mode, connection, capability_string, do_prompt):
super().__init__(
io_capability={
{
'keyboard': PairingDelegate.KEYBOARD_INPUT_ONLY,
'display': PairingDelegate.DISPLAY_OUTPUT_ONLY,
'display+keyboard': PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT,
@@ -291,9 +285,7 @@ async def pair(
mitm,
bond,
ctkd,
linger,
io,
oob,
prompt,
request,
print_keys,
@@ -351,52 +343,16 @@ async def pair(
await device.keystore.print(prefix=color('@@@ ', 'blue'))
print(color('@@@-----------------------------------', 'blue'))
# Create an OOB context if needed
if oob:
our_oob_context = OobContext()
shared_data = (
None
if oob == '-'
else OobData.from_ad(AdvertisingData.from_bytes(bytes.fromhex(oob)))
)
legacy_context = OobLegacyContext()
oob_contexts = PairingConfig.OobConfig(
our_context=our_oob_context,
peer_data=shared_data,
legacy_context=legacy_context,
)
oob_data = OobData(
address=device.random_address,
shared_data=shared_data,
legacy_context=legacy_context,
)
print(color('@@@-----------------------------------', 'yellow'))
print(color('@@@ OOB Data:', 'yellow'))
print(color(f'@@@ {our_oob_context.share()}', 'yellow'))
print(color(f'@@@ TK={legacy_context.tk.hex()}', 'yellow'))
print(color(f'@@@ HEX: ({bytes(oob_data.to_ad()).hex()})', 'yellow'))
print(color('@@@-----------------------------------', 'yellow'))
else:
oob_contexts = None
# Set up a pairing config factory
device.pairing_config_factory = lambda connection: PairingConfig(
sc=sc,
mitm=mitm,
bonding=bond,
oob=oob_contexts,
delegate=Delegate(mode, connection, io, prompt),
sc, mitm, bond, Delegate(mode, connection, io, prompt)
)
# Connect to a peer or wait for a connection
device.on('connection', lambda connection: on_connection(connection, request))
if address_or_name is not None:
print(color(f'=== Connecting to {address_or_name}...', 'green'))
connection = await device.connect(
address_or_name,
transport=BT_LE_TRANSPORT if mode == 'le' else BT_BR_EDR_TRANSPORT,
)
pairing_failure = False
connection = await device.connect(address_or_name)
if not request:
try:
@@ -404,12 +360,10 @@ async def pair(
await connection.pair()
else:
await connection.authenticate()
return
except ProtocolError as error:
pairing_failure = True
print(color(f'Pairing failed: {error}', 'red'))
if not linger or pairing_failure:
return
return
else:
if mode == 'le':
# Advertise so that peers can find us and connect
@@ -459,7 +413,6 @@ class LogHandler(logging.Handler):
help='Enable CTKD',
show_default=True,
)
@click.option('--linger', default=True, is_flag=True, help='Linger after pairing')
@click.option(
'--io',
type=click.Choice(
@@ -468,14 +421,6 @@ class LogHandler(logging.Handler):
default='display+keyboard',
show_default=True,
)
@click.option(
'--oob',
metavar='<oob-data-hex>',
help=(
'Use OOB pairing with this data from the peer '
'(use "-" to enable OOB without peer data)'
),
)
@click.option('--prompt', is_flag=True, help='Prompt to accept/reject pairing request')
@click.option(
'--request', is_flag=True, help='Request that the connecting peer initiate pairing'
@@ -495,9 +440,7 @@ def main(
mitm,
bond,
ctkd,
linger,
io,
oob,
prompt,
request,
print_keys,
@@ -520,9 +463,7 @@ def main(
mitm,
bond,
ctkd,
linger,
io,
oob,
prompt,
request,
print_keys,
+68 -83
View File
@@ -15,13 +15,9 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import dataclasses
import struct
import logging
from collections.abc import AsyncGenerator
from typing import List, Callable, Awaitable
from collections import namedtuple
from .company_ids import COMPANY_IDENTIFIERS
from .sdp import (
@@ -243,20 +239,24 @@ def make_audio_sink_service_sdp_records(service_record_handle, version=(1, 3)):
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class SbcMediaCodecInformation:
class SbcMediaCodecInformation(
namedtuple(
'SbcMediaCodecInformation',
[
'sampling_frequency',
'channel_mode',
'block_length',
'subbands',
'allocation_method',
'minimum_bitpool_value',
'maximum_bitpool_value',
],
)
):
'''
A2DP spec - 4.3.2 Codec Specific Information Elements
'''
sampling_frequency: int
channel_mode: int
block_length: int
subbands: int
allocation_method: int
minimum_bitpool_value: int
maximum_bitpool_value: int
SAMPLING_FREQUENCY_BITS = {16000: 1 << 3, 32000: 1 << 2, 44100: 1 << 1, 48000: 1}
CHANNEL_MODE_BITS = {
SBC_MONO_CHANNEL_MODE: 1 << 3,
@@ -272,7 +272,7 @@ class SbcMediaCodecInformation:
}
@staticmethod
def from_bytes(data: bytes) -> SbcMediaCodecInformation:
def from_bytes(data: bytes) -> 'SbcMediaCodecInformation':
sampling_frequency = (data[0] >> 4) & 0x0F
channel_mode = (data[0] >> 0) & 0x0F
block_length = (data[1] >> 4) & 0x0F
@@ -293,14 +293,14 @@ class SbcMediaCodecInformation:
@classmethod
def from_discrete_values(
cls,
sampling_frequency: int,
channel_mode: int,
block_length: int,
subbands: int,
allocation_method: int,
minimum_bitpool_value: int,
maximum_bitpool_value: int,
) -> SbcMediaCodecInformation:
sampling_frequency,
channel_mode,
block_length,
subbands,
allocation_method,
minimum_bitpool_value,
maximum_bitpool_value,
):
return SbcMediaCodecInformation(
sampling_frequency=cls.SAMPLING_FREQUENCY_BITS[sampling_frequency],
channel_mode=cls.CHANNEL_MODE_BITS[channel_mode],
@@ -314,14 +314,14 @@ class SbcMediaCodecInformation:
@classmethod
def from_lists(
cls,
sampling_frequencies: List[int],
channel_modes: List[int],
block_lengths: List[int],
subbands: List[int],
allocation_methods: List[int],
minimum_bitpool_value: int,
maximum_bitpool_value: int,
) -> SbcMediaCodecInformation:
sampling_frequencies,
channel_modes,
block_lengths,
subbands,
allocation_methods,
minimum_bitpool_value,
maximum_bitpool_value,
):
return SbcMediaCodecInformation(
sampling_frequency=sum(
cls.SAMPLING_FREQUENCY_BITS[x] for x in sampling_frequencies
@@ -348,7 +348,7 @@ class SbcMediaCodecInformation:
]
)
def __str__(self) -> str:
def __str__(self):
channel_modes = ['MONO', 'DUAL_CHANNEL', 'STEREO', 'JOINT_STEREO']
allocation_methods = ['SNR', 'Loudness']
return '\n'.join(
@@ -367,19 +367,16 @@ class SbcMediaCodecInformation:
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class AacMediaCodecInformation:
class AacMediaCodecInformation(
namedtuple(
'AacMediaCodecInformation',
['object_type', 'sampling_frequency', 'channels', 'rfa', 'vbr', 'bitrate'],
)
):
'''
A2DP spec - 4.5.2 Codec Specific Information Elements
'''
object_type: int
sampling_frequency: int
channels: int
rfa: int
vbr: int
bitrate: int
OBJECT_TYPE_BITS = {
MPEG_2_AAC_LC_OBJECT_TYPE: 1 << 7,
MPEG_4_AAC_LC_OBJECT_TYPE: 1 << 6,
@@ -403,7 +400,7 @@ class AacMediaCodecInformation:
CHANNELS_BITS = {1: 1 << 1, 2: 1}
@staticmethod
def from_bytes(data: bytes) -> AacMediaCodecInformation:
def from_bytes(data: bytes) -> 'AacMediaCodecInformation':
object_type = data[0]
sampling_frequency = (data[1] << 4) | ((data[2] >> 4) & 0x0F)
channels = (data[2] >> 2) & 0x03
@@ -416,13 +413,8 @@ class AacMediaCodecInformation:
@classmethod
def from_discrete_values(
cls,
object_type: int,
sampling_frequency: int,
channels: int,
vbr: int,
bitrate: int,
) -> AacMediaCodecInformation:
cls, object_type, sampling_frequency, channels, vbr, bitrate
):
return AacMediaCodecInformation(
object_type=cls.OBJECT_TYPE_BITS[object_type],
sampling_frequency=cls.SAMPLING_FREQUENCY_BITS[sampling_frequency],
@@ -433,14 +425,7 @@ class AacMediaCodecInformation:
)
@classmethod
def from_lists(
cls,
object_types: List[int],
sampling_frequencies: List[int],
channels: List[int],
vbr: int,
bitrate: int,
) -> AacMediaCodecInformation:
def from_lists(cls, object_types, sampling_frequencies, channels, vbr, bitrate):
return AacMediaCodecInformation(
object_type=sum(cls.OBJECT_TYPE_BITS[x] for x in object_types),
sampling_frequency=sum(
@@ -464,7 +449,7 @@ class AacMediaCodecInformation:
]
)
def __str__(self) -> str:
def __str__(self):
object_types = [
'MPEG_2_AAC_LC',
'MPEG_4_AAC_LC',
@@ -489,26 +474,26 @@ class AacMediaCodecInformation:
)
@dataclasses.dataclass
# -----------------------------------------------------------------------------
class VendorSpecificMediaCodecInformation:
'''
A2DP spec - 4.7.2 Codec Specific Information Elements
'''
vendor_id: int
codec_id: int
value: bytes
@staticmethod
def from_bytes(data: bytes) -> VendorSpecificMediaCodecInformation:
def from_bytes(data):
(vendor_id, codec_id) = struct.unpack_from('<IH', data, 0)
return VendorSpecificMediaCodecInformation(vendor_id, codec_id, data[6:])
def __bytes__(self) -> bytes:
def __init__(self, vendor_id, codec_id, value):
self.vendor_id = vendor_id
self.codec_id = codec_id
self.value = value
def __bytes__(self):
return struct.pack('<IH', self.vendor_id, self.codec_id, self.value)
def __str__(self) -> str:
def __str__(self):
# pylint: disable=line-too-long
return '\n'.join(
[
@@ -521,27 +506,29 @@ class VendorSpecificMediaCodecInformation:
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class SbcFrame:
sampling_frequency: int
block_count: int
channel_mode: int
subband_count: int
payload: bytes
def __init__(
self, sampling_frequency, block_count, channel_mode, subband_count, payload
):
self.sampling_frequency = sampling_frequency
self.block_count = block_count
self.channel_mode = channel_mode
self.subband_count = subband_count
self.payload = payload
@property
def sample_count(self) -> int:
def sample_count(self):
return self.subband_count * self.block_count
@property
def bitrate(self) -> int:
def bitrate(self):
return 8 * ((len(self.payload) * self.sampling_frequency) // self.sample_count)
@property
def duration(self) -> float:
def duration(self):
return self.sample_count / self.sampling_frequency
def __str__(self) -> str:
def __str__(self):
return (
f'SBC(sf={self.sampling_frequency},'
f'cm={self.channel_mode},'
@@ -553,12 +540,12 @@ class SbcFrame:
# -----------------------------------------------------------------------------
class SbcParser:
def __init__(self, read: Callable[[int], Awaitable[bytes]]) -> None:
def __init__(self, read):
self.read = read
@property
def frames(self) -> AsyncGenerator[SbcFrame, None]:
async def generate_frames() -> AsyncGenerator[SbcFrame, None]:
def frames(self):
async def generate_frames():
while True:
# Read 4 bytes of header
header = await self.read(4)
@@ -602,9 +589,7 @@ class SbcParser:
# -----------------------------------------------------------------------------
class SbcPacketSource:
def __init__(
self, read: Callable[[int], Awaitable[bytes]], mtu: int, codec_capabilities
) -> None:
def __init__(self, read, mtu, codec_capabilities):
self.read = read
self.mtu = mtu
self.codec_capabilities = codec_capabilities
+3 -3
View File
@@ -250,15 +250,15 @@ async def find_avdtp_service_with_sdp_client(
# -----------------------------------------------------------------------------
async def find_avdtp_service_with_connection(
connection: device.Connection,
device: device.Device, connection: device.Connection
) -> Optional[Tuple[int, int]]:
'''
Find an AVDTP service, for a connection, and return its version,
or None if none is found
'''
sdp_client = sdp.Client(connection)
await sdp_client.connect()
sdp_client = sdp.Client(device)
await sdp_client.connect(connection)
service_version = await find_avdtp_service_with_sdp_client(sdp_client)
await sdp_client.disconnect()
-3
View File
@@ -1000,9 +1000,6 @@ class Controller:
'''
See Bluetooth spec Vol 4, Part E - 7.8.10 LE Set Scan Parameters Command
'''
if self.le_scan_enable:
return bytes([HCI_COMMAND_DISALLOWED_ERROR])
self.le_scan_type = command.le_scan_type
self.le_scan_interval = command.le_scan_interval
self.le_scan_window = command.le_scan_window
-11
View File
@@ -16,7 +16,6 @@
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import enum
import struct
from typing import List, Optional, Tuple, Union, cast, Dict
@@ -1052,13 +1051,3 @@ class ConnectionPHY:
def __str__(self):
return f'ConnectionPHY(tx_phy={self.tx_phy}, rx_phy={self.rx_phy})'
# -----------------------------------------------------------------------------
# LE Role
# -----------------------------------------------------------------------------
class LeRole(enum.IntEnum):
PERIPHERAL_ONLY = 0x00
CENTRAL_ONLY = 0x01
BOTH_PERIPHERAL_PREFERRED = 0x02
BOTH_CENTRAL_PREFERRED = 0x03
+66 -82
View File
@@ -21,8 +21,6 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import logging
import operator
@@ -31,13 +29,11 @@ from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.asymmetric.ec import (
generate_private_key,
ECDH,
EllipticCurvePrivateKey,
EllipticCurvePublicNumbers,
EllipticCurvePrivateNumbers,
SECP256R1,
)
from cryptography.hazmat.primitives import cmac
from typing import Tuple
# -----------------------------------------------------------------------------
@@ -50,18 +46,16 @@ logger = logging.getLogger(__name__)
# Classes
# -----------------------------------------------------------------------------
class EccKey:
def __init__(self, private_key: EllipticCurvePrivateKey) -> None:
def __init__(self, private_key):
self.private_key = private_key
@classmethod
def generate(cls) -> EccKey:
def generate(cls):
private_key = generate_private_key(SECP256R1())
return cls(private_key)
@classmethod
def from_private_key_bytes(
cls, d_bytes: bytes, x_bytes: bytes, y_bytes: bytes
) -> EccKey:
def from_private_key_bytes(cls, d_bytes, x_bytes, y_bytes):
d = int.from_bytes(d_bytes, byteorder='big', signed=False)
x = int.from_bytes(x_bytes, byteorder='big', signed=False)
y = int.from_bytes(y_bytes, byteorder='big', signed=False)
@@ -71,7 +65,7 @@ class EccKey:
return cls(private_key)
@property
def x(self) -> bytes:
def x(self):
return (
self.private_key.public_key()
.public_numbers()
@@ -79,14 +73,14 @@ class EccKey:
)
@property
def y(self) -> bytes:
def y(self):
return (
self.private_key.public_key()
.public_numbers()
.y.to_bytes(32, byteorder='big')
)
def dh(self, public_key_x: bytes, public_key_y: bytes) -> bytes:
def dh(self, public_key_x, public_key_y):
x = int.from_bytes(public_key_x, byteorder='big', signed=False)
y = int.from_bytes(public_key_y, byteorder='big', signed=False)
public_key = EllipticCurvePublicNumbers(x, y, SECP256R1()).public_key()
@@ -99,23 +93,14 @@ class EccKey:
# Functions
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
def xor(x: bytes, y: bytes) -> bytes:
def xor(x, y):
assert len(x) == len(y)
return bytes(map(operator.xor, x, y))
# -----------------------------------------------------------------------------
def reverse(input: bytes) -> bytes:
'''
Returns bytes of input in reversed endianness.
'''
return input[::-1]
# -----------------------------------------------------------------------------
def r() -> bytes:
def r():
'''
Generate 16 bytes of random data
'''
@@ -123,20 +108,20 @@ def r() -> bytes:
# -----------------------------------------------------------------------------
def e(key: bytes, data: bytes) -> bytes:
def e(key, data):
'''
AES-128 ECB, expecting byte-swapped inputs and producing a byte-swapped output.
See Bluetooth spec Vol 3, Part H - 2.2.1 Security function e
'''
cipher = Cipher(algorithms.AES(reverse(key)), modes.ECB())
cipher = Cipher(algorithms.AES(bytes(reversed(key))), modes.ECB())
encryptor = cipher.encryptor()
return reverse(encryptor.update(reverse(data)))
return bytes(reversed(encryptor.update(bytes(reversed(data)))))
# -----------------------------------------------------------------------------
def ah(k: bytes, r: bytes) -> bytes: # pylint: disable=redefined-outer-name
def ah(k, r): # pylint: disable=redefined-outer-name
'''
See Bluetooth spec Vol 3, Part H - 2.2.2 Random Address Hash function ah
'''
@@ -147,16 +132,7 @@ def ah(k: bytes, r: bytes) -> bytes: # pylint: disable=redefined-outer-name
# -----------------------------------------------------------------------------
def c1(
k: bytes,
r: bytes,
preq: bytes,
pres: bytes,
iat: int,
rat: int,
ia: bytes,
ra: bytes,
) -> bytes: # pylint: disable=redefined-outer-name
def c1(k, r, preq, pres, iat, rat, ia, ra): # pylint: disable=redefined-outer-name
'''
See Bluetooth spec, Vol 3, Part H - 2.2.3 Confirm value generation function c1 for
LE Legacy Pairing
@@ -168,7 +144,7 @@ def c1(
# -----------------------------------------------------------------------------
def s1(k: bytes, r1: bytes, r2: bytes) -> bytes:
def s1(k, r1, r2):
'''
See Bluetooth spec, Vol 3, Part H - 2.2.4 Key generation function s1 for LE Legacy
Pairing
@@ -178,7 +154,7 @@ def s1(k: bytes, r1: bytes, r2: bytes) -> bytes:
# -----------------------------------------------------------------------------
def aes_cmac(m: bytes, k: bytes) -> bytes:
def aes_cmac(m, k):
'''
See Bluetooth spec, Vol 3, Part H - 2.2.5 FunctionAES-CMAC
@@ -190,16 +166,20 @@ def aes_cmac(m: bytes, k: bytes) -> bytes:
# -----------------------------------------------------------------------------
def f4(u: bytes, v: bytes, x: bytes, z: bytes) -> bytes:
def f4(u, v, x, z):
'''
See Bluetooth spec, Vol 3, Part H - 2.2.6 LE Secure Connections Confirm Value
Generation Function f4
'''
return reverse(aes_cmac(reverse(u) + reverse(v) + z, reverse(x)))
return bytes(
reversed(
aes_cmac(bytes(reversed(u)) + bytes(reversed(v)) + z, bytes(reversed(x)))
)
)
# -----------------------------------------------------------------------------
def f5(w: bytes, n1: bytes, n2: bytes, a1: bytes, a2: bytes) -> Tuple[bytes, bytes]:
def f5(w, n1, n2, a1, a2):
'''
See Bluetooth spec, Vol 3, Part H - 2.2.7 LE Secure Connections Key Generation
Function f5
@@ -207,83 +187,87 @@ def f5(w: bytes, n1: bytes, n2: bytes, a1: bytes, a2: bytes) -> Tuple[bytes, byt
NOTE: this returns a tuple: (MacKey, LTK) in little-endian byte order
'''
salt = bytes.fromhex('6C888391AAF5A53860370BDB5A6083BE')
t = aes_cmac(reverse(w), salt)
t = aes_cmac(bytes(reversed(w)), salt)
key_id = bytes([0x62, 0x74, 0x6C, 0x65])
return (
reverse(
aes_cmac(
bytes([0])
+ key_id
+ reverse(n1)
+ reverse(n2)
+ reverse(a1)
+ reverse(a2)
+ bytes([1, 0]),
t,
bytes(
reversed(
aes_cmac(
bytes([0])
+ key_id
+ bytes(reversed(n1))
+ bytes(reversed(n2))
+ bytes(reversed(a1))
+ bytes(reversed(a2))
+ bytes([1, 0]),
t,
)
)
),
reverse(
aes_cmac(
bytes([1])
+ key_id
+ reverse(n1)
+ reverse(n2)
+ reverse(a1)
+ reverse(a2)
+ bytes([1, 0]),
t,
bytes(
reversed(
aes_cmac(
bytes([1])
+ key_id
+ bytes(reversed(n1))
+ bytes(reversed(n2))
+ bytes(reversed(a1))
+ bytes(reversed(a2))
+ bytes([1, 0]),
t,
)
)
),
)
# -----------------------------------------------------------------------------
def f6(
w: bytes, n1: bytes, n2: bytes, r: bytes, io_cap: bytes, a1: bytes, a2: bytes
) -> bytes: # pylint: disable=redefined-outer-name
def f6(w, n1, n2, r, io_cap, a1, a2): # pylint: disable=redefined-outer-name
'''
See Bluetooth spec, Vol 3, Part H - 2.2.8 LE Secure Connections Check Value
Generation Function f6
'''
return reverse(
aes_cmac(
reverse(n1)
+ reverse(n2)
+ reverse(r)
+ reverse(io_cap)
+ reverse(a1)
+ reverse(a2),
reverse(w),
return bytes(
reversed(
aes_cmac(
bytes(reversed(n1))
+ bytes(reversed(n2))
+ bytes(reversed(r))
+ bytes(reversed(io_cap))
+ bytes(reversed(a1))
+ bytes(reversed(a2)),
bytes(reversed(w)),
)
)
)
# -----------------------------------------------------------------------------
def g2(u: bytes, v: bytes, x: bytes, y: bytes) -> int:
def g2(u, v, x, y):
'''
See Bluetooth spec, Vol 3, Part H - 2.2.9 LE Secure Connections Numeric Comparison
Value Generation Function g2
'''
return int.from_bytes(
aes_cmac(
reverse(u) + reverse(v) + reverse(y),
reverse(x),
bytes(reversed(u)) + bytes(reversed(v)) + bytes(reversed(y)),
bytes(reversed(x)),
)[-4:],
byteorder='big',
)
# -----------------------------------------------------------------------------
def h6(w: bytes, key_id: bytes) -> bytes:
def h6(w, key_id):
'''
See Bluetooth spec, Vol 3, Part H - 2.2.10 Link key conversion function h6
'''
return reverse(aes_cmac(key_id, reverse(w)))
return aes_cmac(key_id, w)
# -----------------------------------------------------------------------------
def h7(salt: bytes, w: bytes) -> bytes:
def h7(salt, w):
'''
See Bluetooth spec, Vol 3, Part H - 2.2.11 Link key conversion function h7
'''
return reverse(aes_cmac(reverse(w), salt))
return aes_cmac(w, salt)
+48 -597
View File
@@ -21,9 +21,8 @@ import functools
import json
import asyncio
import logging
from contextlib import asynccontextmanager, AsyncExitStack, closing
from contextlib import asynccontextmanager, AsyncExitStack
from dataclasses import dataclass
from collections.abc import Iterable
from typing import (
Any,
Callable,
@@ -33,8 +32,6 @@ from typing import (
Optional,
Tuple,
Type,
TypeVar,
Set,
Union,
cast,
overload,
@@ -49,7 +46,6 @@ from .hci import (
HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE,
HCI_CENTRAL_ROLE,
HCI_COMMAND_STATUS_PENDING,
HCI_CONNECTED_ISOCHRONOUS_STREAM_LE_SUPPORTED_FEATURE,
HCI_CONNECTION_REJECTED_DUE_TO_LIMITED_RESOURCES_ERROR,
HCI_DISPLAY_YES_NO_IO_CAPABILITY,
HCI_DISPLAY_ONLY_IO_CAPABILITY,
@@ -86,44 +82,30 @@ from .hci import (
HCI_Constant,
HCI_Create_Connection_Cancel_Command,
HCI_Create_Connection_Command,
HCI_Create_Connection_Command,
HCI_Disconnect_Command,
HCI_Encryption_Change_Event,
HCI_Error,
HCI_IO_Capability_Request_Reply_Command,
HCI_Inquiry_Cancel_Command,
HCI_Inquiry_Command,
HCI_IsoDataPacket,
HCI_LE_Accept_CIS_Request_Command,
HCI_LE_Add_Device_To_Resolving_List_Command,
HCI_LE_Advertising_Report_Event,
HCI_LE_Clear_Resolving_List_Command,
HCI_LE_Connection_Update_Command,
HCI_LE_Create_Connection_Cancel_Command,
HCI_LE_Create_Connection_Command,
HCI_LE_Create_CIS_Command,
HCI_LE_Enable_Encryption_Command,
HCI_LE_Extended_Advertising_Report_Event,
HCI_LE_Extended_Create_Connection_Command,
HCI_LE_Rand_Command,
HCI_LE_Read_PHY_Command,
HCI_LE_Reject_CIS_Request_Command,
HCI_LE_Remove_Advertising_Set_Command,
HCI_LE_Set_Address_Resolution_Enable_Command,
HCI_LE_Set_Advertising_Data_Command,
HCI_LE_Set_Advertising_Enable_Command,
HCI_LE_Set_Advertising_Parameters_Command,
HCI_LE_Set_Advertising_Set_Random_Address_Command,
HCI_LE_Set_CIG_Parameters_Command,
HCI_LE_Set_Data_Length_Command,
HCI_LE_Set_Default_PHY_Command,
HCI_LE_Set_Extended_Scan_Enable_Command,
HCI_LE_Set_Extended_Scan_Parameters_Command,
HCI_LE_Set_Extended_Scan_Response_Data_Command,
HCI_LE_Set_Extended_Advertising_Data_Command,
HCI_LE_Set_Extended_Advertising_Enable_Command,
HCI_LE_Set_Extended_Advertising_Parameters_Command,
HCI_LE_Set_Host_Feature_Command,
HCI_LE_Set_PHY_Command,
HCI_LE_Set_Random_Address_Command,
HCI_LE_Set_Scan_Enable_Command,
@@ -138,7 +120,6 @@ from .hci import (
HCI_Switch_Role_Command,
HCI_Set_Connection_Encryption_Command,
HCI_StatusError,
HCI_SynchronousDataPacket,
HCI_User_Confirmation_Request_Negative_Reply_Command,
HCI_User_Confirmation_Request_Reply_Command,
HCI_User_Passkey_Request_Negative_Reply_Command,
@@ -170,11 +151,9 @@ from .core import (
from .utils import (
AsyncRunner,
CompositeEventEmitter,
EventWatcher,
setup_event_forwarding,
composite_listener,
deprecated,
experimental,
)
from .keys import (
KeyStore,
@@ -209,8 +188,6 @@ DEVICE_MIN_SCAN_WINDOW = 25
DEVICE_MAX_SCAN_WINDOW = 10240
DEVICE_MIN_LE_RSSI = -127
DEVICE_MAX_LE_RSSI = 20
DEVICE_MIN_EXTENDED_ADVERTISING_SET_HANDLE = 0x00
DEVICE_MAX_EXTENDED_ADVERTISING_SET_HANDLE = 0xEF
DEVICE_DEFAULT_ADDRESS = '00:00:00:00:00:00'
DEVICE_DEFAULT_ADVERTISING_INTERVAL = 1000 # ms
@@ -452,11 +429,8 @@ class LePhyOptions:
# -----------------------------------------------------------------------------
_PROXY_CLASS = TypeVar('_PROXY_CLASS', bound=gatt_client.ProfileServiceProxy)
class Peer:
def __init__(self, connection: Connection) -> None:
def __init__(self, connection):
self.connection = connection
# Create a GATT client for the connection
@@ -464,113 +438,77 @@ class Peer:
connection.gatt_client = self.gatt_client
@property
def services(self) -> List[gatt_client.ServiceProxy]:
def services(self):
return self.gatt_client.services
async def request_mtu(self, mtu: int) -> int:
async def request_mtu(self, mtu):
mtu = await self.gatt_client.request_mtu(mtu)
self.connection.emit('connection_att_mtu_update')
return mtu
async def discover_service(
self, uuid: Union[core.UUID, str]
) -> List[gatt_client.ServiceProxy]:
async def discover_service(self, uuid):
return await self.gatt_client.discover_service(uuid)
async def discover_services(
self, uuids: Iterable[core.UUID] = ()
) -> List[gatt_client.ServiceProxy]:
async def discover_services(self, uuids=()):
return await self.gatt_client.discover_services(uuids)
async def discover_included_services(
self, service: gatt_client.ServiceProxy
) -> List[gatt_client.ServiceProxy]:
async def discover_included_services(self, service):
return await self.gatt_client.discover_included_services(service)
async def discover_characteristics(
self,
uuids: Iterable[Union[core.UUID, str]] = (),
service: Optional[gatt_client.ServiceProxy] = None,
) -> List[gatt_client.CharacteristicProxy]:
async def discover_characteristics(self, uuids=(), service=None):
return await self.gatt_client.discover_characteristics(
uuids=uuids, service=service
)
async def discover_descriptors(
self,
characteristic: Optional[gatt_client.CharacteristicProxy] = None,
start_handle: Optional[int] = None,
end_handle: Optional[int] = None,
self, characteristic=None, start_handle=None, end_handle=None
):
return await self.gatt_client.discover_descriptors(
characteristic, start_handle, end_handle
)
async def discover_attributes(self) -> List[gatt_client.AttributeProxy]:
async def discover_attributes(self):
return await self.gatt_client.discover_attributes()
async def subscribe(
self,
characteristic: gatt_client.CharacteristicProxy,
subscriber: Optional[Callable[[bytes], Any]] = None,
prefer_notify: bool = True,
) -> None:
async def subscribe(self, characteristic, subscriber=None, prefer_notify=True):
return await self.gatt_client.subscribe(
characteristic, subscriber, prefer_notify
)
async def unsubscribe(
self,
characteristic: gatt_client.CharacteristicProxy,
subscriber: Optional[Callable[[bytes], Any]] = None,
) -> None:
async def unsubscribe(self, characteristic, subscriber=None):
return await self.gatt_client.unsubscribe(characteristic, subscriber)
async def read_value(
self, attribute: Union[int, gatt_client.AttributeProxy]
) -> bytes:
async def read_value(self, attribute):
return await self.gatt_client.read_value(attribute)
async def write_value(
self,
attribute: Union[int, gatt_client.AttributeProxy],
value: bytes,
with_response: bool = False,
) -> None:
async def write_value(self, attribute, value, with_response=False):
return await self.gatt_client.write_value(attribute, value, with_response)
async def read_characteristics_by_uuid(
self, uuid: core.UUID, service: Optional[gatt_client.ServiceProxy] = None
) -> List[bytes]:
async def read_characteristics_by_uuid(self, uuid, service=None):
return await self.gatt_client.read_characteristics_by_uuid(uuid, service)
def get_services_by_uuid(self, uuid: core.UUID) -> List[gatt_client.ServiceProxy]:
def get_services_by_uuid(self, uuid):
return self.gatt_client.get_services_by_uuid(uuid)
def get_characteristics_by_uuid(
self, uuid: core.UUID, service: Optional[gatt_client.ServiceProxy] = None
) -> List[gatt_client.CharacteristicProxy]:
def get_characteristics_by_uuid(self, uuid, service=None):
return self.gatt_client.get_characteristics_by_uuid(uuid, service)
def create_service_proxy(self, proxy_class: Type[_PROXY_CLASS]) -> _PROXY_CLASS:
return cast(_PROXY_CLASS, proxy_class.from_client(self.gatt_client))
def create_service_proxy(self, proxy_class):
return proxy_class.from_client(self.gatt_client)
async def discover_service_and_create_proxy(
self, proxy_class: Type[_PROXY_CLASS]
) -> Optional[_PROXY_CLASS]:
async def discover_service_and_create_proxy(self, proxy_class):
# Discover the first matching service and its characteristics
services = await self.discover_service(proxy_class.SERVICE_CLASS.UUID)
if services:
service = services[0]
await service.discover_characteristics()
return self.create_service_proxy(proxy_class)
return None
async def sustain(self, timeout: Optional[float] = None) -> None:
async def sustain(self, timeout=None):
await self.connection.sustain(timeout)
# [Classic only]
async def request_name(self) -> str:
async def request_name(self):
return await self.connection.request_remote_name()
async def __aenter__(self):
@@ -583,7 +521,7 @@ class Peer:
async def __aexit__(self, exc_type, exc_value, traceback):
pass
def __str__(self) -> str:
def __str__(self):
return f'{self.connection.peer_address} as {self.connection.role_name}'
@@ -602,46 +540,6 @@ class ConnectionParametersPreferences:
ConnectionParametersPreferences.default = ConnectionParametersPreferences()
# -----------------------------------------------------------------------------
@dataclass
class ScoLink(CompositeEventEmitter):
device: Device
acl_connection: Connection
handle: int
link_type: int
def __post_init__(self):
super().__init__()
async def disconnect(
self, reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR
) -> None:
await self.device.disconnect(self, reason)
# -----------------------------------------------------------------------------
@dataclass
class CisLink(CompositeEventEmitter):
class State(IntEnum):
PENDING = 0
ESTABLISHED = 1
device: Device
acl_connection: Connection # Based ACL connection
handle: int # CIS handle assigned by Controller (in LE_Set_CIG_Parameters Complete or LE_CIS_Request events)
cis_id: int # CIS ID assigned by Central device
cig_id: int # CIG ID assigned by Central device
state: State = State.PENDING
def __post_init__(self):
super().__init__()
async def disconnect(
self, reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR
) -> None:
await self.device.disconnect(self, reason)
# -----------------------------------------------------------------------------
class Connection(CompositeEventEmitter):
device: Device
@@ -823,7 +721,7 @@ class Connection(CompositeEventEmitter):
async def switch_role(self, role: int) -> None:
return await self.device.switch_role(self, role)
async def sustain(self, timeout: Optional[float] = None) -> None:
async def sustain(self, timeout=None):
"""Idles the current task waiting for a disconnect or timeout"""
abort = asyncio.get_running_loop().create_future()
@@ -838,9 +736,6 @@ class Connection(CompositeEventEmitter):
self.remove_listener('disconnection', abort.set_result)
self.remove_listener('disconnection_failure', abort.set_exception)
async def set_data_length(self, tx_octets, tx_time) -> None:
return await self.device.set_data_length(self, tx_octets, tx_time)
async def update_parameters(
self,
connection_interval_min,
@@ -920,7 +815,6 @@ class DeviceConfiguration:
self.keystore = None
self.gatt_services: List[Dict[str, Any]] = []
self.address_resolution_offload = False
self.cis_enabled = False
def load_from_dict(self, config: Dict[str, Any]) -> None:
# Load simple properties
@@ -956,7 +850,6 @@ class DeviceConfiguration:
self.address_resolution_offload = config.get(
'address_resolution_offload', self.address_resolution_offload
)
self.cis_enabled = config.get('cis_enabled', self.cis_enabled)
# Load or synthesize an IRK
irk = config.get('irk')
@@ -1063,10 +956,6 @@ class Device(CompositeEventEmitter):
]
advertisement_accumulators: Dict[Address, AdvertisementDataAccumulator]
config: DeviceConfiguration
extended_advertising_handles: Set[int]
sco_links: Dict[int, ScoLink]
cis_links: Dict[int, CisLink]
_pending_cis: Dict[int, Tuple[int, int]]
@composite_listener
class Listener:
@@ -1159,16 +1048,12 @@ class Device(CompositeEventEmitter):
self.disconnecting = False
self.connections = {} # Connections, by connection handle
self.pending_connections = {} # Connections, by BD address (BR/EDR only)
self.sco_links = {} # ScoLinks, by connection handle (BR/EDR only)
self.cis_links = {} # CisLinks, by connection handle (LE only)
self._pending_cis = {} # (CIS_ID, CIG_ID), by CIS_handle
self.classic_enabled = False
self.inquiry_response = None
self.address_resolver = None
self.classic_pending_accepts = {
Address.ANY: []
} # Futures, by BD address OR [Futures] for Address.ANY
self.extended_advertising_handles = set()
# Own address type cache
self.advertising_own_address_type = None
@@ -1191,7 +1076,6 @@ class Device(CompositeEventEmitter):
self.le_enabled = config.le_enabled
self.classic_enabled = config.classic_enabled
self.le_simultaneous_enabled = config.le_simultaneous_enabled
self.cis_enabled = config.cis_enabled
self.classic_sc_enabled = config.classic_sc_enabled
self.classic_ssp_enabled = config.classic_ssp_enabled
self.classic_smp_enabled = config.classic_smp_enabled
@@ -1502,16 +1386,6 @@ class Device(CompositeEventEmitter):
) # type: ignore[call-arg]
)
if self.cis_enabled:
await self.send_command(
HCI_LE_Set_Host_Feature_Command( # type: ignore[call-arg]
bit_number=(
HCI_CONNECTED_ISOCHRONOUS_STREAM_LE_SUPPORTED_FEATURE
),
bit_value=1,
)
)
if self.classic_enabled:
await self.send_command(
HCI_Write_Local_Name_Command(local_name=self.name.encode('utf8')) # type: ignore[call-arg]
@@ -1658,149 +1532,6 @@ class Device(CompositeEventEmitter):
self.advertising = False
self.auto_restart_advertising = False
@experimental('Extended Advertising is still experimental - Might be changed soon.')
async def start_extended_advertising(
self,
advertising_properties: HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties = HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING,
target: Address = Address.ANY,
own_address_type: int = OwnAddressType.RANDOM,
scan_response: Optional[bytes] = None,
advertising_data: Optional[bytes] = None,
) -> int:
"""Starts an extended advertising set.
Args:
advertising_properties: Properties to pass in HCI_LE_Set_Extended_Advertising_Parameters_Command
target: Directed advertising target. Directed property should be set in advertising_properties arg.
own_address_type: own address type to use in the advertising.
scan_response: raw scan response. When a non-none value is set, HCI_LE_Set_Extended_Scan_Response_Data_Command will be sent.
advertising_data: raw advertising data. When a non-none value is set, HCI_LE_Set_Advertising_Set_Random_Address_Command will be sent.
Returns:
Handle of the new advertising set.
"""
adv_handle = -1
# Find a free handle
for i in range(
DEVICE_MIN_EXTENDED_ADVERTISING_SET_HANDLE,
DEVICE_MAX_EXTENDED_ADVERTISING_SET_HANDLE + 1,
):
if i not in self.extended_advertising_handles:
adv_handle = i
break
if adv_handle == -1:
raise InvalidStateError('No available advertising set.')
try:
# Set the advertising parameters
await self.send_command(
HCI_LE_Set_Extended_Advertising_Parameters_Command(
advertising_handle=adv_handle,
advertising_event_properties=advertising_properties,
primary_advertising_interval_min=self.advertising_interval_min,
primary_advertising_interval_max=self.advertising_interval_max,
primary_advertising_channel_map=(
HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap.CHANNEL_37
| HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap.CHANNEL_38
| HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap.CHANNEL_39
),
own_address_type=own_address_type,
peer_address_type=target.address_type,
peer_address=target,
advertising_tx_power=7,
advertising_filter_policy=0,
primary_advertising_phy=1, # LE 1M
secondary_advertising_max_skip=0,
secondary_advertising_phy=1, # LE 1M
advertising_sid=0,
scan_request_notification_enable=0,
), # type: ignore[call-arg]
check_result=True,
)
# Set the advertising data if present
if advertising_data is not None:
await self.send_command(
HCI_LE_Set_Extended_Advertising_Data_Command(
advertising_handle=adv_handle,
operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA,
fragment_preference=0x01, # Should not fragment
advertising_data=advertising_data,
), # type: ignore[call-arg]
check_result=True,
)
# Set the scan response if present
if scan_response is not None:
await self.send_command(
HCI_LE_Set_Extended_Scan_Response_Data_Command(
advertising_handle=adv_handle,
operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA,
fragment_preference=0x01, # Should not fragment
scan_response_data=scan_response,
), # type: ignore[call-arg]
check_result=True,
)
if own_address_type in (
OwnAddressType.RANDOM,
OwnAddressType.RESOLVABLE_OR_RANDOM,
):
await self.send_command(
HCI_LE_Set_Advertising_Set_Random_Address_Command(
advertising_handle=adv_handle,
random_address=self.random_address,
), # type: ignore[call-arg]
check_result=True,
)
# Enable advertising
await self.send_command(
HCI_LE_Set_Extended_Advertising_Enable_Command(
enable=1,
advertising_handles=[adv_handle],
durations=[0], # Forever
max_extended_advertising_events=[0], # Infinite
), # type: ignore[call-arg]
check_result=True,
)
except HCI_Error as error:
# When any step fails, cleanup the advertising handle.
await self.send_command(
HCI_LE_Remove_Advertising_Set_Command(advertising_handle=adv_handle), # type: ignore[call-arg]
check_result=False,
)
raise error
self.extended_advertising_handles.add(adv_handle)
return adv_handle
@experimental('Extended Advertising is still experimental - Might be changed soon.')
async def stop_extended_advertising(self, adv_handle: int) -> None:
"""Stops an extended advertising set.
Args:
adv_handle: Handle of the advertising set to stop.
"""
# Disable advertising
await self.send_command(
HCI_LE_Set_Extended_Advertising_Enable_Command(
enable=0,
advertising_handles=[adv_handle],
durations=[0],
max_extended_advertising_events=[0],
), # type: ignore[call-arg]
check_result=True,
)
# Remove advertising set
await self.send_command(
HCI_LE_Remove_Advertising_Set_Command(advertising_handle=adv_handle), # type: ignore[call-arg]
check_result=True,
)
self.extended_advertising_handles.remove(adv_handle)
@property
def is_advertising(self):
return self.advertising
@@ -2435,9 +2166,7 @@ class Device(CompositeEventEmitter):
check_result=True,
)
async def disconnect(
self, connection: Union[Connection, ScoLink, CisLink], reason: int
) -> None:
async def disconnect(self, connection, reason):
# Create a future so that we can wait for the disconnection's result
pending_disconnection = asyncio.get_running_loop().create_future()
connection.on('disconnection', pending_disconnection.set_result)
@@ -2445,7 +2174,7 @@ class Device(CompositeEventEmitter):
# Request a disconnection
result = await self.send_command(
HCI_Disconnect_Command(connection_handle=connection.handle, reason=reason) # type: ignore[call-arg]
HCI_Disconnect_Command(connection_handle=connection.handle, reason=reason)
)
try:
@@ -2464,22 +2193,6 @@ class Device(CompositeEventEmitter):
)
self.disconnecting = False
async def set_data_length(self, connection, tx_octets, tx_time) -> None:
if tx_octets < 0x001B or tx_octets > 0x00FB:
raise ValueError('tx_octets must be between 0x001B and 0x00FB')
if tx_time < 0x0148 or tx_time > 0x4290:
raise ValueError('tx_time must be between 0x0148 and 0x4290')
return await self.send_command(
HCI_LE_Set_Data_Length_Command(
connection_handle=connection.handle,
tx_octets=tx_octets,
tx_time=tx_time,
), # type: ignore[call-arg]
check_result=True,
)
async def update_connection_parameters(
self,
connection,
@@ -2908,154 +2621,6 @@ class Device(CompositeEventEmitter):
self.remove_listener('remote_name', handler)
self.remove_listener('remote_name_failure', failure_handler)
# [LE only]
@experimental('Only for testing.')
async def setup_cig(
self,
cig_id: int,
cis_id: List[int],
sdu_interval: Tuple[int, int],
framing: int,
max_sdu: Tuple[int, int],
retransmission_number: int,
max_transport_latency: Tuple[int, int],
) -> List[int]:
"""Sends HCI_LE_Set_CIG_Parameters_Command.
Args:
cig_id: CIG_ID.
cis_id: CID ID list.
sdu_interval: SDU intervals of (Central->Peripheral, Peripheral->Cental).
framing: Un-framing(0) or Framing(1).
max_sdu: Max SDU counts of (Central->Peripheral, Peripheral->Cental).
retransmission_number: retransmission_number.
max_transport_latency: Max transport latencies of
(Central->Peripheral, Peripheral->Cental).
Returns:
List of created CIS handles corresponding to the same order of [cid_id].
"""
num_cis = len(cis_id)
response = await self.send_command(
HCI_LE_Set_CIG_Parameters_Command( # type: ignore[call-arg]
cig_id=cig_id,
sdu_interval_c_to_p=sdu_interval[0],
sdu_interval_p_to_c=sdu_interval[1],
worst_case_sca=0x00, # 251-500 ppm
packing=0x00, # Sequential
framing=framing,
max_transport_latency_c_to_p=max_transport_latency[0],
max_transport_latency_p_to_c=max_transport_latency[1],
cis_id=cis_id,
max_sdu_c_to_p=[max_sdu[0]] * num_cis,
max_sdu_p_to_c=[max_sdu[1]] * num_cis,
phy_c_to_p=[HCI_LE_2M_PHY] * num_cis,
phy_p_to_c=[HCI_LE_2M_PHY] * num_cis,
rtn_c_to_p=[retransmission_number] * num_cis,
rtn_p_to_c=[retransmission_number] * num_cis,
),
check_result=True,
)
# Ideally, we should manage CIG lifecycle, but they are not useful for Unicast
# Server, so here it only provides a basic functionality for testing.
cis_handles = response.return_parameters.connection_handle[:]
for id, cis_handle in zip(cis_id, cis_handles):
self._pending_cis[cis_handle] = (id, cig_id)
return cis_handles
# [LE only]
@experimental('Only for testing.')
async def create_cis(self, cis_acl_pairs: List[Tuple[int, int]]) -> List[CisLink]:
for cis_handle, acl_handle in cis_acl_pairs:
acl_connection = self.lookup_connection(acl_handle)
assert acl_connection
cis_id, cig_id = self._pending_cis.pop(cis_handle)
self.cis_links[cis_handle] = CisLink(
device=self,
acl_connection=acl_connection,
handle=cis_handle,
cis_id=cis_id,
cig_id=cig_id,
)
result = await self.send_command(
HCI_LE_Create_CIS_Command( # type: ignore[call-arg]
cis_connection_handle=[p[0] for p in cis_acl_pairs],
acl_connection_handle=[p[1] for p in cis_acl_pairs],
),
)
if result.status != HCI_COMMAND_STATUS_PENDING:
logger.warning(
'HCI_LE_Create_CIS_Command failed: '
f'{HCI_Constant.error_name(result.status)}'
)
raise HCI_StatusError(result)
pending_cis_establishments: Dict[int, asyncio.Future[CisLink]] = {}
for cis_handle, _ in cis_acl_pairs:
pending_cis_establishments[
cis_handle
] = asyncio.get_running_loop().create_future()
with closing(EventWatcher()) as watcher:
@watcher.on(self, 'cis_establishment')
def on_cis_establishment(cis_link: CisLink) -> None:
if pending_future := pending_cis_establishments.get(
cis_link.handle, None
):
pending_future.set_result(cis_link)
return await asyncio.gather(*pending_cis_establishments.values())
# [LE only]
@experimental('Only for testing.')
async def accept_cis_request(self, handle: int) -> CisLink:
result = await self.send_command(
HCI_LE_Accept_CIS_Request_Command( # type: ignore[call-arg]
connection_handle=handle
),
)
if result.status != HCI_COMMAND_STATUS_PENDING:
logger.warning(
'HCI_LE_Accept_CIS_Request_Command failed: '
f'{HCI_Constant.error_name(result.status)}'
)
raise HCI_StatusError(result)
pending_cis_establishment = asyncio.get_running_loop().create_future()
with closing(EventWatcher()) as watcher:
@watcher.on(self, 'cis_establishment')
def on_cis_establishment(cis_link: CisLink) -> None:
if cis_link.handle == handle:
pending_cis_establishment.set_result(cis_link)
return await pending_cis_establishment
# [LE only]
@experimental('Only for testing.')
async def reject_cis_request(
self,
handle: int,
reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR,
) -> None:
result = await self.send_command(
HCI_LE_Reject_CIS_Request_Command( # type: ignore[call-arg]
connection_handle=handle, reason=reason
),
)
if result.status != HCI_COMMAND_STATUS_PENDING:
logger.warning(
'HCI_LE_Reject_CIS_Request_Command failed: '
f'{HCI_Constant.error_name(result.status)}'
)
raise HCI_StatusError(result)
@host_event_handler
def on_flush(self):
self.emit('flush')
@@ -3260,35 +2825,30 @@ class Device(CompositeEventEmitter):
)
@host_event_handler
def on_disconnection(self, connection_handle: int, reason: int) -> None:
if connection := self.connections.pop(connection_handle, None):
logger.debug(
f'*** Disconnection: [0x{connection.handle:04X}] '
f'{connection.peer_address} as {connection.role_name}, reason={reason}'
)
connection.emit('disconnection', reason)
@with_connection_from_handle
def on_disconnection(self, connection, reason):
logger.debug(
f'*** Disconnection: [0x{connection.handle:04X}] '
f'{connection.peer_address} as {connection.role_name}, reason={reason}'
)
connection.emit('disconnection', reason)
# Cleanup subsystems that maintain per-connection state
self.gatt_server.on_disconnection(connection)
# Remove the connection from the map
del self.connections[connection.handle]
# Restart advertising if auto-restart is enabled
if self.auto_restart_advertising:
logger.debug('restarting advertising')
self.abort_on(
'flush',
self.start_advertising(
advertising_type=self.advertising_type, # type: ignore[arg-type]
own_address_type=self.advertising_own_address_type, # type: ignore[arg-type]
auto_restart=True,
),
)
elif sco_link := self.sco_links.pop(connection_handle, None):
sco_link.emit('disconnection', reason)
elif cis_link := self.cis_links.pop(connection_handle, None):
cis_link.emit('disconnection', reason)
else:
logger.error(
f'*** Unknown disconnection handle=0x{connection_handle}, reason={reason} ***'
# Cleanup subsystems that maintain per-connection state
self.gatt_server.on_disconnection(connection)
# Restart advertising if auto-restart is enabled
if self.auto_restart_advertising:
logger.debug('restarting advertising')
self.abort_on(
'flush',
self.start_advertising(
advertising_type=self.advertising_type,
own_address_type=self.advertising_own_address_type,
auto_restart=True,
),
)
@host_event_handler
@@ -3567,107 +3127,6 @@ class Device(CompositeEventEmitter):
connection.emit('remote_name_failure', error)
self.emit('remote_name_failure', address, error)
# [Classic only]
@host_event_handler
@with_connection_from_address
@experimental('Only for testing.')
def on_sco_connection(
self, acl_connection: Connection, sco_handle: int, link_type: int
) -> None:
logger.debug(
f'*** SCO connected: {acl_connection.peer_address}, '
f'sco_handle=[0x{sco_handle:04X}], '
f'link_type=[0x{link_type:02X}] ***'
)
sco_link = self.sco_links[sco_handle] = ScoLink(
device=self,
acl_connection=acl_connection,
handle=sco_handle,
link_type=link_type,
)
self.emit('sco_connection', sco_link)
# [Classic only]
@host_event_handler
@with_connection_from_address
@experimental('Only for testing.')
def on_sco_connection_failure(
self, acl_connection: Connection, status: int
) -> None:
logger.debug(f'*** SCO connection failure: {acl_connection.peer_address}***')
self.emit('sco_connection_failure')
# [Classic only]
@host_event_handler
@experimental('Only for testing')
def on_sco_packet(self, sco_handle: int, packet: HCI_SynchronousDataPacket) -> None:
if sco_link := self.sco_links.get(sco_handle, None):
sco_link.emit('pdu', packet)
# [LE only]
@host_event_handler
@with_connection_from_handle
@experimental('Only for testing')
def on_cis_request(
self,
acl_connection: Connection,
cis_handle: int,
cig_id: int,
cis_id: int,
) -> None:
logger.debug(
f'*** CIS Request '
f'acl_handle=[0x{acl_connection.handle:04X}]{acl_connection.peer_address}, '
f'cis_handle=[0x{cis_handle:04X}], '
f'cig_id=[0x{cig_id:02X}], '
f'cis_id=[0x{cis_id:02X}] ***'
)
# LE_CIS_Established event doesn't provide info, so we must store them here.
self.cis_links[cis_handle] = CisLink(
device=self,
acl_connection=acl_connection,
handle=cis_handle,
cig_id=cig_id,
cis_id=cis_id,
)
self.emit('cis_request', acl_connection, cis_handle, cig_id, cis_id)
# [LE only]
@host_event_handler
@experimental('Only for testing')
def on_cis_establishment(self, cis_handle: int) -> None:
cis_link = self.cis_links[cis_handle]
cis_link.state = CisLink.State.ESTABLISHED
assert cis_link.acl_connection
logger.debug(
f'*** CIS Establishment '
f'{cis_link.acl_connection.peer_address}, '
f'cis_handle=[0x{cis_handle:04X}], '
f'cig_id=[0x{cis_link.cig_id:02X}], '
f'cis_id=[0x{cis_link.cis_id:02X}] ***'
)
cis_link.emit('establishment')
self.emit('cis_establishment', cis_link)
# [LE only]
@host_event_handler
@experimental('Only for testing')
def on_cis_establishment_failure(self, cis_handle: int, status: int) -> None:
logger.debug(f'*** CIS Establishment Failure: cis=[0x{cis_handle:04X}] ***')
if cis_link := self.cis_links.pop(cis_handle, None):
cis_link.emit('establishment_failure')
self.emit('cis_establishment_failure', cis_handle, status)
# [LE only]
@host_event_handler
@experimental('Only for testing')
def on_iso_packet(self, handle: int, packet: HCI_IsoDataPacket) -> None:
if cis_link := self.cis_links.get(handle, None):
cis_link.emit('pdu', packet)
@host_event_handler
@with_connection_from_handle
def on_connection_encryption_change(self, connection, encryption):
@@ -3679,18 +3138,10 @@ class Device(CompositeEventEmitter):
connection.encryption = encryption
if (
not connection.authenticated
and connection.transport == BT_BR_EDR_TRANSPORT
and encryption == HCI_Encryption_Change_Event.AES_CCM
):
connection.authenticated = True
connection.sc = True
if (
not connection.authenticated
and connection.transport == BT_LE_TRANSPORT
and encryption == HCI_Encryption_Change_Event.E0_OR_AES_CCM
):
connection.authenticated = True
connection.sc = True
connection.emit('connection_encryption_change')
@host_event_handler
+6 -116
View File
@@ -93,35 +93,20 @@ GATT_RECONNECTION_CONFIGURATION_SERVICE = UUID.from_16_bits(0x1829, 'Reconne
GATT_INSULIN_DELIVERY_SERVICE = UUID.from_16_bits(0x183A, 'Insulin Delivery')
GATT_BINARY_SENSOR_SERVICE = UUID.from_16_bits(0x183B, 'Binary Sensor')
GATT_EMERGENCY_CONFIGURATION_SERVICE = UUID.from_16_bits(0x183C, 'Emergency Configuration')
GATT_AUTHORIZATION_CONTROL_SERVICE = UUID.from_16_bits(0x183D, 'Authorization Control')
GATT_PHYSICAL_ACTIVITY_MONITOR_SERVICE = UUID.from_16_bits(0x183E, 'Physical Activity Monitor')
GATT_ELAPSED_TIME_SERVICE = UUID.from_16_bits(0x183F, 'Elapsed Time')
GATT_GENERIC_HEALTH_SENSOR_SERVICE = UUID.from_16_bits(0x1840, 'Generic Health Sensor')
GATT_AUDIO_INPUT_CONTROL_SERVICE = UUID.from_16_bits(0x1843, 'Audio Input Control')
GATT_VOLUME_CONTROL_SERVICE = UUID.from_16_bits(0x1844, 'Volume Control')
GATT_VOLUME_OFFSET_CONTROL_SERVICE = UUID.from_16_bits(0x1845, 'Volume Offset Control')
GATT_COORDINATED_SET_IDENTIFICATION_SERVICE = UUID.from_16_bits(0x1846, 'Coordinated Set Identification')
GATT_COORDINATED_SET_IDENTIFICATION_SERVICE = UUID.from_16_bits(0x1846, 'Coordinated Set Identification Service')
GATT_DEVICE_TIME_SERVICE = UUID.from_16_bits(0x1847, 'Device Time')
GATT_MEDIA_CONTROL_SERVICE = UUID.from_16_bits(0x1848, 'Media Control')
GATT_GENERIC_MEDIA_CONTROL_SERVICE = UUID.from_16_bits(0x1849, 'Generic Media Control')
GATT_MEDIA_CONTROL_SERVICE = UUID.from_16_bits(0x1848, 'Media Control Service')
GATT_GENERIC_MEDIA_CONTROL_SERVICE = UUID.from_16_bits(0x1849, 'Generic Media Control Service')
GATT_CONSTANT_TONE_EXTENSION_SERVICE = UUID.from_16_bits(0x184A, 'Constant Tone Extension')
GATT_TELEPHONE_BEARER_SERVICE = UUID.from_16_bits(0x184B, 'Telephone Bearer')
GATT_GENERIC_TELEPHONE_BEARER_SERVICE = UUID.from_16_bits(0x184C, 'Generic Telephone Bearer')
GATT_TELEPHONE_BEARER_SERVICE = UUID.from_16_bits(0x184B, 'Telephone Bearer Service')
GATT_GENERIC_TELEPHONE_BEARER_SERVICE = UUID.from_16_bits(0x184C, 'Generic Telephone Bearer Service')
GATT_MICROPHONE_CONTROL_SERVICE = UUID.from_16_bits(0x184D, 'Microphone Control')
GATT_AUDIO_STREAM_CONTROL_SERVICE = UUID.from_16_bits(0x184E, 'Audio Stream Control')
GATT_BROADCAST_AUDIO_SCAN_SERVICE = UUID.from_16_bits(0x184F, 'Broadcast Audio Scan')
GATT_PUBLISHED_AUDIO_CAPABILITIES_SERVICE = UUID.from_16_bits(0x1850, 'Published Audio Capabilities')
GATT_BASIC_AUDIO_ANNOUNCEMENT_SERVICE = UUID.from_16_bits(0x1851, 'Basic Audio Announcement')
GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE = UUID.from_16_bits(0x1852, 'Broadcast Audio Announcement')
GATT_COMMON_AUDIO_SERVICE = UUID.from_16_bits(0x1853, 'Common Audio')
GATT_HEARING_ACCESS_SERVICE = UUID.from_16_bits(0x1854, 'Hearing Access')
GATT_TELEPHONY_AND_MEDIA_AUDIO_SERVICE = UUID.from_16_bits(0x1855, 'Telephony and Media Audio')
GATT_PUBLIC_BROADCAST_ANNOUNCEMENT_SERVICE = UUID.from_16_bits(0x1856, 'Public Broadcast Announcement')
GATT_ELECTRONIC_SHELF_LABEL_SERVICE = UUID.from_16_bits(0X1857, 'Electronic Shelf Label')
GATT_GAMING_AUDIO_SERVICE = UUID.from_16_bits(0x1858, 'Gaming Audio')
GATT_MESH_PROXY_SOLICITATION_SERVICE = UUID.from_16_bits(0x1859, 'Mesh Audio Solicitation')
# Attribute Types
# Types
GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE = UUID.from_16_bits(0x2800, 'Primary Service')
GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE = UUID.from_16_bits(0x2801, 'Secondary Service')
GATT_INCLUDE_ATTRIBUTE_TYPE = UUID.from_16_bits(0x2802, 'Include')
@@ -144,8 +129,6 @@ GATT_ENVIRONMENTAL_SENSING_MEASUREMENT_DESCRIPTOR = UUID.from_16_bits(0x290C,
GATT_ENVIRONMENTAL_SENSING_TRIGGER_DESCRIPTOR = UUID.from_16_bits(0x290D, 'Environmental Sensing Trigger Setting')
GATT_TIME_TRIGGER_DESCRIPTOR = UUID.from_16_bits(0x290E, 'Time Trigger Setting')
GATT_COMPLETE_BR_EDR_TRANSPORT_BLOCK_DATA_DESCRIPTOR = UUID.from_16_bits(0x290F, 'Complete BR-EDR Transport Block Data')
GATT_OBSERVATION_SCHEDULE_DESCRIPTOR = UUID.from_16_bits(0x290F, 'Observation Schedule')
GATT_VALID_RANGE_AND_ACCURACY_DESCRIPTOR = UUID.from_16_bits(0x290F, 'Valid Range And Accuracy')
# Device Information Service
GATT_SYSTEM_ID_CHARACTERISTIC = UUID.from_16_bits(0x2A23, 'System ID')
@@ -173,96 +156,6 @@ GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2A39, 'Heart
# Battery Service
GATT_BATTERY_LEVEL_CHARACTERISTIC = UUID.from_16_bits(0x2A19, 'Battery Level')
# Telephony And Media Audio Service (TMAS)
GATT_TMAP_ROLE_CHARACTERISTIC = UUID.from_16_bits(0x2B51, 'TMAP Role')
# Audio Input Control Service (AICS)
GATT_AUDIO_INPUT_STATE_CHARACTERISTIC = UUID.from_16_bits(0x2B77, 'Audio Input State')
GATT_GAIN_SETTINGS_ATTRIBUTE_CHARACTERISTIC = UUID.from_16_bits(0x2B78, 'Gain Settings Attribute')
GATT_AUDIO_INPUT_TYPE_CHARACTERISTIC = UUID.from_16_bits(0x2B79, 'Audio Input Type')
GATT_AUDIO_INPUT_STATUS_CHARACTERISTIC = UUID.from_16_bits(0x2B7A, 'Audio Input Status')
GATT_AUDIO_INPUT_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2B7B, 'Audio Input Control Point')
GATT_AUDIO_INPUT_DESCRIPTION_CHARACTERISTIC = UUID.from_16_bits(0x2B7C, 'Audio Input Description')
# Volume Control Service (VCS)
GATT_VOLUME_STATE_CHARACTERISTIC = UUID.from_16_bits(0x2B7D, 'Volume State')
GATT_VOLUME_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2B7E, 'Volume Control Point')
GATT_VOLUME_FLAGS_CHARACTERISTIC = UUID.from_16_bits(0x2B7F, 'Volume Flags')
# Volume Offset Control Service (VOCS)
GATT_VOLUME_OFFSET_STATE_CHARACTERISTIC = UUID.from_16_bits(0x2B80, 'Volume Offset State')
GATT_AUDIO_LOCATION_CHARACTERISTIC = UUID.from_16_bits(0x2B81, 'Audio Location')
GATT_VOLUME_OFFSET_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2B82, 'Volume Offset Control Point')
GATT_AUDIO_OUTPUT_DESCRIPTION_CHARACTERISTIC = UUID.from_16_bits(0x2B83, 'Audio Output Description')
# Coordinated Set Identification Service (CSIS)
GATT_SET_IDENTITY_RESOLVING_KEY_CHARACTERISTIC = UUID.from_16_bits(0x2B84, 'Set Identity Resolving Key')
GATT_COORDINATED_SET_SIZE_CHARACTERISTIC = UUID.from_16_bits(0x2B85, 'Coordinated Set Size')
GATT_SET_MEMBER_LOCK_CHARACTERISTIC = UUID.from_16_bits(0x2B86, 'Set Member Lock')
GATT_SET_MEMBER_RANK_CHARACTERISTIC = UUID.from_16_bits(0x2B87, 'Set Member Rank')
# Media Control Service (MCS)
GATT_MEDIA_PLAYER_NAME_CHARACTERISTIC = UUID.from_16_bits(0x2B93, 'Media Player Name')
GATT_MEDIA_PLAYER_ICON_OBJECT_ID_CHARACTERISTIC = UUID.from_16_bits(0x2B94, 'Media Player Icon Object ID')
GATT_MEDIA_PLAYER_ICON_URL_CHARACTERISTIC = UUID.from_16_bits(0x2B95, 'Media Player Icon URL')
GATT_TRACK_CHANGED_CHARACTERISTIC = UUID.from_16_bits(0x2B96, 'Track Changed')
GATT_TRACK_TITLE_CHARACTERISTIC = UUID.from_16_bits(0x2B97, 'Track Title')
GATT_TRACK_DURATION_CHARACTERISTIC = UUID.from_16_bits(0x2B98, 'Track Duration')
GATT_TRACK_POSITION_CHARACTERISTIC = UUID.from_16_bits(0x2B99, 'Track Position')
GATT_PLAYBACK_SPEED_CHARACTERISTIC = UUID.from_16_bits(0x2B9A, 'Playback Speed')
GATT_SEEKING_SPEED_CHARACTERISTIC = UUID.from_16_bits(0x2B9B, 'Seeking Speed')
GATT_CURRENT_TRACK_SEGMENTS_OBJECT_ID_CHARACTERISTIC = UUID.from_16_bits(0x2B9C, 'Current Track Segments Object ID')
GATT_CURRENT_TRACK_OBJECT_ID_CHARACTERISTIC = UUID.from_16_bits(0x2B9D, 'Current Track Object ID')
GATT_NEXT_TRACK_OBJECT_ID_CHARACTERISTIC = UUID.from_16_bits(0x2B9E, 'Next Track Object ID')
GATT_PARENT_GROUP_OBJECT_ID_CHARACTERISTIC = UUID.from_16_bits(0x2B9F, 'Parent Group Object ID')
GATT_CURRENT_GROUP_OBJECT_ID_CHARACTERISTIC = UUID.from_16_bits(0x2BA0, 'Current Group Object ID')
GATT_PLAYING_ORDER_CHARACTERISTIC = UUID.from_16_bits(0x2BA1, 'Playing Order')
GATT_PLAYING_ORDERS_SUPPORTED_CHARACTERISTIC = UUID.from_16_bits(0x2BA2, 'Playing Orders Supported')
GATT_MEDIA_STATE_CHARACTERISTIC = UUID.from_16_bits(0x2BA3, 'Media State')
GATT_MEDIA_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2BA4, 'Media Control Point')
GATT_MEDIA_CONTROL_POINT_OPCODES_SUPPORTED_CHARACTERISTIC = UUID.from_16_bits(0x2BA5, 'Media Control Point Opcodes Supported')
GATT_SEARCH_RESULTS_OBJECT_ID_CHARACTERISTIC = UUID.from_16_bits(0x2BA6, 'Search Results Object ID')
GATT_SEARCH_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2BA7, 'Search Control Point')
GATT_CONTENT_CONTROL_ID_CHARACTERISTIC = UUID.from_16_bits(0x2BBA, 'Content Control Id')
# Telephone Bearer Service (TBS)
GATT_BEARER_PROVIDER_NAME_CHARACTERISTIC = UUID.from_16_bits(0x2BB4, 'Bearer Provider Name')
GATT_BEARER_UCI_CHARACTERISTIC = UUID.from_16_bits(0x2BB5, 'Bearer UCI')
GATT_BEARER_TECHNOLOGY_CHARACTERISTIC = UUID.from_16_bits(0x2BB6, 'Bearer Technology')
GATT_BEARER_URI_SCHEMES_SUPPORTED_LIST_CHARACTERISTIC = UUID.from_16_bits(0x2BB7, 'Bearer URI Schemes Supported List')
GATT_BEARER_SIGNAL_STRENGTH_CHARACTERISTIC = UUID.from_16_bits(0x2BB8, 'Bearer Signal Strength')
GATT_BEARER_SIGNAL_STRENGTH_REPORTING_INTERVAL_CHARACTERISTIC = UUID.from_16_bits(0x2BB9, 'Bearer Signal Strength Reporting Interval')
GATT_BEARER_LIST_CURRENT_CALLS_CHARACTERISTIC = UUID.from_16_bits(0x2BBA, 'Bearer List Current Calls')
GATT_CONTENT_CONTROL_ID_CHARACTERISTIC = UUID.from_16_bits(0x2BBB, 'Content Control ID')
GATT_STATUS_FLAGS_CHARACTERISTIC = UUID.from_16_bits(0x2BBC, 'Status Flags')
GATT_INCOMING_CALL_TARGET_BEARER_URI_CHARACTERISTIC = UUID.from_16_bits(0x2BBD, 'Incoming Call Target Bearer URI')
GATT_CALL_STATE_CHARACTERISTIC = UUID.from_16_bits(0x2BBE, 'Call State')
GATT_CALL_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2BBF, 'Call Control Point')
GATT_CALL_CONTROL_POINT_OPTIONAL_OPCODES_CHARACTERISTIC = UUID.from_16_bits(0x2BC0, 'Call Control Point Optional Opcodes')
GATT_TERMINATION_REASON_CHARACTERISTIC = UUID.from_16_bits(0x2BC1, 'Termination Reason')
GATT_INCOMING_CALL_CHARACTERISTIC = UUID.from_16_bits(0x2BC2, 'Incoming Call')
GATT_CALL_FRIENDLY_NAME_CHARACTERISTIC = UUID.from_16_bits(0x2BC3, 'Call Friendly Name')
# Microphone Control Service (MICS)
GATT_MUTE_CHARACTERISTIC = UUID.from_16_bits(0x2BC3, 'Mute')
# Audio Stream Control Service (ASCS)
GATT_SINK_ASE_CHARACTERISTIC = UUID.from_16_bits(0x2BC4, 'Sink ASE')
GATT_SOURCE_ASE_CHARACTERISTIC = UUID.from_16_bits(0x2BC5, 'Source ASE')
GATT_ASE_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2BC6, 'ASE Control Point')
# Broadcast Audio Scan Service (BASS)
GATT_BROADCAST_AUDIO_SCAN_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2BC7, 'Broadcast Audio Scan Control Point')
GATT_BROADCAST_RECEIVE_STATE_CHARACTERISTIC = UUID.from_16_bits(0x2BC8, 'Broadcast Receive State')
# Published Audio Capabilities Service (PACS)
GATT_SINK_PAC_CHARACTERISTIC = UUID.from_16_bits(0x2BC9, 'Sink PAC')
GATT_SINK_AUDIO_LOCATION_CHARACTERISTIC = UUID.from_16_bits(0x2BCA, 'Sink Audio Location')
GATT_SOURCE_PAC_CHARACTERISTIC = UUID.from_16_bits(0x2BCB, 'Source PAC')
GATT_SOURCE_AUDIO_LOCATION_CHARACTERISTIC = UUID.from_16_bits(0x2BCC, 'Source Audio Location')
GATT_AVAILABLE_AUDIO_CONTEXTS_CHARACTERISTIC = UUID.from_16_bits(0x2BCD, 'Available Audio Contexts')
GATT_SUPPORTED_AUDIO_CONTEXTS_CHARACTERISTIC = UUID.from_16_bits(0x2BCE, 'Supported Audio Contexts')
# ASHA Service
GATT_ASHA_SERVICE = UUID.from_16_bits(0xFDF0, 'Audio Streaming for Hearing Aid')
GATT_ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC = UUID('6333651e-c481-4a3e-9169-7c902aad37bb', 'ReadOnlyProperties')
@@ -284,9 +177,6 @@ GATT_BOOT_KEYBOARD_INPUT_REPORT_CHARACTERISTIC = UUID.from_16_bi
GATT_CURRENT_TIME_CHARACTERISTIC = UUID.from_16_bits(0x2A2B, 'Current Time')
GATT_BOOT_KEYBOARD_OUTPUT_REPORT_CHARACTERISTIC = UUID.from_16_bits(0x2A32, 'Boot Keyboard Output Report')
GATT_CENTRAL_ADDRESS_RESOLUTION__CHARACTERISTIC = UUID.from_16_bits(0x2AA6, 'Central Address Resolution')
GATT_CLIENT_SUPPORTED_FEATURES_CHARACTERISTIC = UUID.from_16_bits(0x2B29, 'Client Supported Features')
GATT_DATABASE_HASH_CHARACTERISTIC = UUID.from_16_bits(0x2B2A, 'Database Hash')
GATT_SERVER_SUPPORTED_FEATURES_CHARACTERISTIC = UUID.from_16_bits(0x2B3A, 'Server Supported Features')
# fmt: on
# pylint: enable=line-too-long
+20 -56
View File
@@ -38,7 +38,6 @@ from typing import (
Any,
Iterable,
Type,
Set,
TYPE_CHECKING,
)
@@ -129,7 +128,7 @@ class ServiceProxy(AttributeProxy):
included_services: List[ServiceProxy]
@staticmethod
def from_client(service_class, client: Client, service_uuid: UUID):
def from_client(service_class, client, service_uuid):
# The service and its characteristics are considered to have already been
# discovered
services = client.get_services_by_uuid(service_uuid)
@@ -207,11 +206,11 @@ class CharacteristicProxy(AttributeProxy):
return await self.client.subscribe(self, subscriber, prefer_notify)
async def unsubscribe(self, subscriber=None, force=False):
async def unsubscribe(self, subscriber=None):
if subscriber in self.subscribers:
subscriber = self.subscribers.pop(subscriber)
return await self.client.unsubscribe(self, subscriber, force)
return await self.client.unsubscribe(self, subscriber)
def __str__(self) -> str:
return (
@@ -247,12 +246,8 @@ class ProfileServiceProxy:
class Client:
services: List[ServiceProxy]
cached_values: Dict[int, Tuple[datetime, bytes]]
notification_subscribers: Dict[
int, Set[Union[CharacteristicProxy, Callable[[bytes], Any]]]
]
indication_subscribers: Dict[
int, Set[Union[CharacteristicProxy, Callable[[bytes], Any]]]
]
notification_subscribers: Dict[int, Callable[[bytes], Any]]
indication_subscribers: Dict[int, Callable[[bytes], Any]]
pending_response: Optional[asyncio.futures.Future[ATT_PDU]]
pending_request: Optional[ATT_PDU]
@@ -262,8 +257,10 @@ class Client:
self.request_semaphore = asyncio.Semaphore(1)
self.pending_request = None
self.pending_response = None
self.notification_subscribers = {} # Subscriber set, by attribute handle
self.indication_subscribers = {} # Subscriber set, by attribute handle
self.notification_subscribers = (
{}
) # Notification subscribers, by attribute handle
self.indication_subscribers = {} # Indication subscribers, by attribute handle
self.services = []
self.cached_values = {}
@@ -685,8 +682,8 @@ class Client:
async def discover_descriptors(
self,
characteristic: Optional[CharacteristicProxy] = None,
start_handle: Optional[int] = None,
end_handle: Optional[int] = None,
start_handle=None,
end_handle=None,
) -> List[DescriptorProxy]:
'''
See Vol 3, Part G - 4.7.1 Discover All Characteristic Descriptors
@@ -792,12 +789,7 @@ class Client:
return attributes
async def subscribe(
self,
characteristic: CharacteristicProxy,
subscriber: Optional[Callable[[bytes], Any]] = None,
prefer_notify: bool = True,
) -> None:
async def subscribe(self, characteristic, subscriber=None, prefer_notify=True):
# If we haven't already discovered the descriptors for this characteristic,
# do it now
if not characteristic.descriptors_discovered:
@@ -834,7 +826,6 @@ class Client:
subscriber_set = subscribers.setdefault(characteristic.handle, set())
if subscriber is not None:
subscriber_set.add(subscriber)
# Add the characteristic as a subscriber, which will result in the
# characteristic emitting an 'update' event when a notification or indication
# is received
@@ -842,18 +833,7 @@ class Client:
await self.write_value(cccd, struct.pack('<H', bits), with_response=True)
async def unsubscribe(
self,
characteristic: CharacteristicProxy,
subscriber: Optional[Callable[[bytes], Any]] = None,
force: bool = False,
) -> None:
'''
Unsubscribe from a characteristic.
If `force` is True, this will write zeros to the CCCD when there are no
subscribers left, even if there were already no registered subscribers.
'''
async def unsubscribe(self, characteristic, subscriber=None):
# If we haven't already discovered the descriptors for this characteristic,
# do it now
if not characteristic.descriptors_discovered:
@@ -867,45 +847,31 @@ class Client:
logger.warning('unsubscribing from characteristic with no CCCD descriptor')
return
# Check if the characteristic has subscribers
if not (
characteristic.handle in self.notification_subscribers
or characteristic.handle in self.indication_subscribers
):
if not force:
return
# Remove the subscriber(s)
if subscriber is not None:
# Remove matching subscriber from subscriber sets
for subscriber_set in (
self.notification_subscribers,
self.indication_subscribers,
):
if (
subscribers := subscriber_set.get(characteristic.handle)
) and subscriber in subscribers:
subscribers = subscriber_set.get(characteristic.handle, [])
if subscriber in subscribers:
subscribers.remove(subscriber)
# Cleanup if we removed the last one
if not subscribers:
del subscriber_set[characteristic.handle]
else:
# Remove all subscribers for this attribute from the sets
# Remove all subscribers for this attribute from the sets!
self.notification_subscribers.pop(characteristic.handle, None)
self.indication_subscribers.pop(characteristic.handle, None)
# Update the CCCD
if not (
characteristic.handle in self.notification_subscribers
or characteristic.handle in self.indication_subscribers
):
if not self.notification_subscribers and not self.indication_subscribers:
# No more subscribers left
await self.write_value(cccd, b'\x00\x00', with_response=True)
async def read_value(
self, attribute: Union[int, AttributeProxy], no_long_read: bool = False
) -> bytes:
) -> Any:
'''
See Vol 3, Part G - 4.8.1 Read Characteristic Value
@@ -1101,7 +1067,7 @@ class Client:
def on_att_handle_value_notification(self, notification):
# Call all subscribers
subscribers = self.notification_subscribers.get(
notification.attribute_handle, set()
notification.attribute_handle, []
)
if not subscribers:
logger.warning('!!! received notification with no subscriber')
@@ -1115,9 +1081,7 @@ class Client:
def on_att_handle_value_indication(self, indication):
# Call all subscribers
subscribers = self.indication_subscribers.get(
indication.attribute_handle, set()
)
subscribers = self.indication_subscribers.get(indication.attribute_handle, [])
if not subscribers:
logger.warning('!!! received indication with no subscriber')
+79 -513
View File
@@ -17,12 +17,10 @@
# -----------------------------------------------------------------------------
from __future__ import annotations
import collections
import dataclasses
import enum
import functools
import logging
import struct
from typing import Any, Dict, Callable, Optional, Type, Union, List
from typing import Any, Dict, Callable, Optional, Type, Union
from .colors import color
from .core import (
@@ -150,7 +148,6 @@ HCI_COMMAND_PACKET = 0x01
HCI_ACL_DATA_PACKET = 0x02
HCI_SYNCHRONOUS_DATA_PACKET = 0x03
HCI_EVENT_PACKET = 0x04
HCI_ISO_DATA_PACKET = 0x05
# HCI Event Codes
HCI_INQUIRY_COMPLETE_EVENT = 0x01
@@ -1373,7 +1370,6 @@ HCI_LE_SUPPORTED_FEATURES_NAMES = {
if feature_name.startswith('HCI_') and feature_name.endswith('_LE_SUPPORTED_FEATURE')
}
# fmt: on
# pylint: enable=line-too-long
# pylint: disable=invalid-name
@@ -1383,45 +1379,6 @@ HCI_LE_SUPPORTED_FEATURES_NAMES = {
STATUS_SPEC = {'size': 1, 'mapper': lambda x: HCI_Constant.status_name(x)}
class CodecID(enum.IntEnum):
# fmt: off
U_LOG = 0x00
A_LOG = 0x01
CVSD = 0x02
TRANSPARENT = 0x03
LINEAR_PCM = 0x04
MSBC = 0x05
LC3 = 0x06
G729A = 0x07
VENDOR_SPECIFIC = 0xFF
@dataclasses.dataclass(frozen=True)
class CodingFormat:
codec_id: CodecID
company_id: int = 0
vendor_specific_codec_id: int = 0
@classmethod
def parse_from_bytes(cls, data: bytes, offset: int):
(codec_id, company_id, vendor_specific_codec_id) = struct.unpack_from(
'<BHH', data, offset
)
return offset + 5, cls(
codec_id=CodecID(codec_id),
company_id=company_id,
vendor_specific_codec_id=vendor_specific_codec_id,
)
def to_bytes(self) -> bytes:
return struct.pack(
'<BHH', self.codec_id, self.company_id, self.vendor_specific_codec_id
)
def __bytes__(self) -> bytes:
return self.to_bytes()
# -----------------------------------------------------------------------------
class HCI_Constant:
@staticmethod
@@ -1517,12 +1474,6 @@ class HCI_Object:
# The rest of the bytes
field_value = data[offset:]
return (field_value, len(field_value))
if field_type == 'v':
# Variable-length bytes field, with 1-byte length at the beginning
field_length = data[offset]
offset += 1
field_value = data[offset : offset + field_length]
return (field_value, field_length + 1)
if field_type == 1:
# 8-bit unsigned
return (data[offset], 1)
@@ -1627,11 +1578,6 @@ class HCI_Object:
raise ValueError('value too large for *-typed field')
else:
field_bytes = bytes(field_value)
elif field_type == 'v':
# Variable-length bytes field, with 1-byte length at the beginning
field_bytes = bytes(field_bytes)
field_length = len(field_bytes)
field_bytes = bytes([field_length]) + field_bytes
elif isinstance(field_value, (bytes, bytearray)) or hasattr(
field_value, 'to_bytes'
):
@@ -1939,7 +1885,6 @@ Address.NIL = Address(b"\xff\xff\xff\xff\xff\xff", Address.PUBLIC_DEVICE_ADDRESS
Address.ANY = Address(b"\x00\x00\x00\x00\x00\x00", Address.PUBLIC_DEVICE_ADDRESS)
Address.ANY_RANDOM = Address(b"\x00\x00\x00\x00\x00\x00", Address.RANDOM_DEVICE_ADDRESS)
# -----------------------------------------------------------------------------
class OwnAddressType:
PUBLIC = 0
@@ -1980,9 +1925,6 @@ class HCI_Packet:
if packet_type == HCI_ACL_DATA_PACKET:
return HCI_AclDataPacket.from_bytes(packet)
if packet_type == HCI_SYNCHRONOUS_DATA_PACKET:
return HCI_SynchronousDataPacket.from_bytes(packet)
if packet_type == HCI_EVENT_PACKET:
return HCI_Event.from_bytes(packet)
@@ -2351,19 +2293,6 @@ class HCI_Read_Clock_Offset_Command(HCI_Command):
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[
('bd_addr', Address.parse_address),
('reason', {'size': 1, 'mapper': HCI_Constant.error_name}),
],
)
class HCI_Reject_Synchronous_Connection_Request_Command(HCI_Command):
'''
See Bluetooth spec @ 7.1.28 Reject Synchronous Connection Request Command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[
@@ -2497,14 +2426,14 @@ class HCI_IO_Capability_Request_Negative_Reply_Command(HCI_Command):
('connection_handle', 2),
('transmit_bandwidth', 4),
('receive_bandwidth', 4),
('transmit_coding_format', CodingFormat.parse_from_bytes),
('receive_coding_format', CodingFormat.parse_from_bytes),
('transmit_coding_format', 5),
('receive_coding_format', 5),
('transmit_codec_frame_size', 2),
('receive_codec_frame_size', 2),
('input_bandwidth', 4),
('output_bandwidth', 4),
('input_coding_format', CodingFormat.parse_from_bytes),
('output_coding_format', CodingFormat.parse_from_bytes),
('input_coding_format', 5),
('output_coding_format', 5),
('input_coded_data_size', 2),
('output_coded_data_size', 2),
('input_pcm_data_format', 1),
@@ -2525,35 +2454,6 @@ class HCI_Enhanced_Setup_Synchronous_Connection_Command(HCI_Command):
See Bluetooth spec @ 7.1.45 Enhanced Setup Synchronous Connection Command
'''
class PcmDataFormat(enum.IntEnum):
NA = 0x00
ONES_COMPLEMENT = 0x01
TWOS_COMPLEMENT = 0x02
SIGN_MAGNITUDE = 0x03
UNSIGNED = 0x04
class DataPath(enum.IntEnum):
HCI = 0x00
PCM = 0x01
class RetransmissionEffort(enum.IntEnum):
NO_RETRANSMISSION = 0x00
OPTIMIZE_FOR_POWER = 0x01
OPTIMIZE_FOR_QUALITY = 0x02
DONT_CARE = 0xFF
class PacketType(enum.IntFlag):
HV1 = 0x0001
HV2 = 0x0002
HV3 = 0x0004
EV3 = 0x0008
EV4 = 0x0010
EV5 = 0x0020
NO_2_EV3 = 0x0040
NO_3_EV3 = 0x0080
NO_2_EV5 = 0x0100
NO_3_EV5 = 0x0200
# -----------------------------------------------------------------------------
@HCI_Command.command(
@@ -2561,14 +2461,14 @@ class HCI_Enhanced_Setup_Synchronous_Connection_Command(HCI_Command):
('bd_addr', Address.parse_address),
('transmit_bandwidth', 4),
('receive_bandwidth', 4),
('transmit_coding_format', CodingFormat.parse_from_bytes),
('receive_coding_format', CodingFormat.parse_from_bytes),
('transmit_coding_format', 5),
('receive_coding_format', 5),
('transmit_codec_frame_size', 2),
('receive_codec_frame_size', 2),
('input_bandwidth', 4),
('output_bandwidth', 4),
('input_coding_format', CodingFormat.parse_from_bytes),
('output_coding_format', CodingFormat.parse_from_bytes),
('input_coding_format', 5),
('output_coding_format', 5),
('input_coded_data_size', 2),
('output_coded_data_size', 2),
('input_pcm_data_format', 1),
@@ -3865,10 +3765,8 @@ class HCI_LE_Set_Advertising_Set_Random_Address_Command(HCI_Command):
'advertising_event_properties',
{
'size': 2,
'mapper': lambda x: str(
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties(
x
)
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Parameters_Command.advertising_properties_string(
x
),
},
),
@@ -3878,8 +3776,8 @@ class HCI_LE_Set_Advertising_Set_Random_Address_Command(HCI_Command):
'primary_advertising_channel_map',
{
'size': 1,
'mapper': lambda x: str(
HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap(x)
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Parameters_Command.channel_map_string(
x
),
},
),
@@ -3901,33 +3799,38 @@ class HCI_LE_Set_Extended_Advertising_Parameters_Command(HCI_Command):
See Bluetooth spec @ 7.8.53 LE Set Extended Advertising Parameters Command
'''
class AdvertisingProperties(enum.IntFlag):
CONNECTABLE_ADVERTISING = 1 << 0
SCANNABLE_ADVERTISING = 1 << 1
DIRECTED_ADVERTISING = 1 << 2
HIGH_DUTY_CYCLE_DIRECTED_CONNECTABLE_ADVERTISING = 1 << 3
USE_LEGACY_ADVERTISING_PDUS = 1 << 4
ANONYMOUS_ADVERTISING = 1 << 5
INCLUDE_TX_POWER = 1 << 6
CONNECTABLE_ADVERTISING = 0
SCANNABLE_ADVERTISING = 1
DIRECTED_ADVERTISING = 2
HIGH_DUTY_CYCLE_DIRECTED_CONNECTABLE_ADVERTISING = 3
USE_LEGACY_ADVERTISING_PDUS = 4
ANONYMOUS_ADVERTISING = 5
INCLUDE_TX_POWER = 6
def __str__(self) -> str:
return '|'.join(
flag.name
for flag in HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties
if self.value & flag.value and flag.name is not None
)
ADVERTISING_PROPERTIES_NAMES = (
'CONNECTABLE_ADVERTISING',
'SCANNABLE_ADVERTISING',
'DIRECTED_ADVERTISING',
'HIGH_DUTY_CYCLE_DIRECTED_CONNECTABLE_ADVERTISING',
'USE_LEGACY_ADVERTISING_PDUS',
'ANONYMOUS_ADVERTISING',
'INCLUDE_TX_POWER',
)
class ChannelMap(enum.IntFlag):
CHANNEL_37 = 1 << 0
CHANNEL_38 = 1 << 1
CHANNEL_39 = 1 << 2
CHANNEL_37 = 0
CHANNEL_38 = 1
CHANNEL_39 = 2
def __str__(self) -> str:
return '|'.join(
flag.name
for flag in HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap
if self.value & flag.value and flag.name is not None
)
CHANNEL_NAMES = ('37', '38', '39')
@classmethod
def advertising_properties_string(cls, properties):
# pylint: disable=line-too-long
return f'[{",".join(bit_flags_to_strings(properties, cls.ADVERTISING_PROPERTIES_NAMES))}]'
@classmethod
def channel_map_string(cls, channel_map):
return f'[{",".join(bit_flags_to_strings(channel_map, cls.CHANNEL_NAMES))}]'
# -----------------------------------------------------------------------------
@@ -3939,9 +3842,9 @@ class HCI_LE_Set_Extended_Advertising_Parameters_Command(HCI_Command):
'operation',
{
'size': 1,
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.Operation(
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.operation_name(
x
).name,
),
},
),
('fragment_preference', 1),
@@ -3959,12 +3862,23 @@ class HCI_LE_Set_Extended_Advertising_Data_Command(HCI_Command):
See Bluetooth spec @ 7.8.54 LE Set Extended Advertising Data Command
'''
class Operation(enum.IntEnum):
INTERMEDIATE_FRAGMENT = 0x00
FIRST_FRAGMENT = 0x01
LAST_FRAGMENT = 0x02
COMPLETE_DATA = 0x03
UNCHANGED_DATA = 0x04
INTERMEDIATE_FRAGMENT = 0x00
FIRST_FRAGMENT = 0x01
LAST_FRAGMENT = 0x02
COMPLETE_DATA = 0x03
UNCHANGED_DATA = 0x04
OPERATION_NAMES = {
INTERMEDIATE_FRAGMENT: 'INTERMEDIATE_FRAGMENT',
FIRST_FRAGMENT: 'FIRST_FRAGMENT',
LAST_FRAGMENT: 'LAST_FRAGMENT',
COMPLETE_DATA: 'COMPLETE_DATA',
UNCHANGED_DATA: 'UNCHANGED_DATA',
}
@classmethod
def operation_name(cls, operation):
return name_or_number(cls.OPERATION_NAMES, operation)
# -----------------------------------------------------------------------------
@@ -3976,9 +3890,9 @@ class HCI_LE_Set_Extended_Advertising_Data_Command(HCI_Command):
'operation',
{
'size': 1,
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.Operation(
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.operation_name(
x
).name,
),
},
),
('fragment_preference', 1),
@@ -3996,6 +3910,22 @@ class HCI_LE_Set_Extended_Scan_Response_Data_Command(HCI_Command):
See Bluetooth spec @ 7.8.55 LE Set Extended Scan Response Data Command
'''
INTERMEDIATE_FRAGMENT = 0x00
FIRST_FRAGMENT = 0x01
LAST_FRAGMENT = 0x02
COMPLETE_DATA = 0x03
OPERATION_NAMES = {
INTERMEDIATE_FRAGMENT: 'INTERMEDIATE_FRAGMENT',
FIRST_FRAGMENT: 'FIRST_FRAGMENT',
LAST_FRAGMENT: 'LAST_FRAGMENT',
COMPLETE_DATA: 'COMPLETE_DATA',
}
@classmethod
def operation_name(cls, operation):
return name_or_number(cls.OPERATION_NAMES, operation)
# -----------------------------------------------------------------------------
@HCI_Command.command(
@@ -4393,162 +4323,6 @@ class HCI_LE_Set_Host_Feature_Command(HCI_Command):
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[
('cig_id', 1),
('sdu_interval_c_to_p', 3),
('sdu_interval_p_to_c', 3),
('worst_case_sca', 1),
('packing', 1),
('framing', 1),
('max_transport_latency_c_to_p', 2),
('max_transport_latency_p_to_c', 2),
[
('cis_id', 1),
('max_sdu_c_to_p', 2),
('max_sdu_p_to_c', 2),
('phy_c_to_p', 1),
('phy_p_to_c', 1),
('rtn_c_to_p', 1),
('rtn_p_to_c', 1),
],
],
return_parameters_fields=[
('status', STATUS_SPEC),
('cig_id', 1),
[('connection_handle', 2)],
],
)
class HCI_LE_Set_CIG_Parameters_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.97 LE Set CIG Parameters Command
'''
cig_id: int
sdu_interval_c_to_p: int
sdu_interval_p_to_c: int
worst_case_sca: int
packing: int
framing: int
max_transport_latency_c_to_p: int
max_transport_latency_p_to_c: int
cis_id: List[int]
max_sdu_c_to_p: List[int]
max_sdu_p_to_c: List[int]
phy_c_to_p: List[int]
phy_p_to_c: List[int]
rtn_c_to_p: List[int]
rtn_p_to_c: List[int]
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[
[
('cis_connection_handle', 2),
('acl_connection_handle', 2),
],
],
)
class HCI_LE_Create_CIS_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.99 LE Create CIS command
'''
cis_connection_handle: List[int]
acl_connection_handle: List[int]
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[('cig_id', 1)],
return_parameters_fields=[('status', STATUS_SPEC), ('cig_id', 1)],
)
class HCI_LE_Remove_CIG_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.100 LE Remove CIG command
'''
cig_id: int
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[('connection_handle', 2)],
)
class HCI_LE_Accept_CIS_Request_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.101 LE Accept CIS Request command
'''
connection_handle: int
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[
('connection_handle', 2),
('reason', {'size': 1, 'mapper': HCI_Constant.error_name}),
],
)
class HCI_LE_Reject_CIS_Request_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.102 LE Reject CIS Request command
'''
connection_handle: int
reason: int
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[
('connection_handle', 2),
('data_path_direction', 1),
('data_path_id', 1),
('codec_id', CodingFormat.parse_from_bytes),
('controller_delay', 3),
('codec_configuration', 'v'),
],
return_parameters_fields=[
('status', STATUS_SPEC),
('connection_handle', 2),
],
)
class HCI_LE_Setup_ISO_Data_Path_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.109 LE Setup ISO Data Path command
'''
connection_handle: int
data_path_direction: int
data_path_id: int
codec_id: CodingFormat
controller_delay: int
codec_configuration: bytes
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[
('connection_handle', 2),
('data_path_direction', 1),
],
return_parameters_fields=[
('status', STATUS_SPEC),
('connection_handle', 2),
],
)
class HCI_LE_Remove_ISO_Data_Path_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.110 LE Remove ISO Data Path command
'''
connection_handle: int
data_path_direction: int
# -----------------------------------------------------------------------------
# HCI Events
# -----------------------------------------------------------------------------
@@ -5168,48 +4942,6 @@ class HCI_LE_Channel_Selection_Algorithm_Event(HCI_LE_Meta_Event):
'''
# -----------------------------------------------------------------------------
@HCI_LE_Meta_Event.event(
[
('status', STATUS_SPEC),
('connection_handle', 2),
('cig_sync_delay', 3),
('cis_sync_delay', 3),
('transport_latency_c_to_p', 3),
('transport_latency_p_to_c', 3),
('phy_c_to_p', 1),
('phy_p_to_c', 1),
('nse', 1),
('bn_c_to_p', 1),
('bn_p_to_c', 1),
('ft_c_to_p', 1),
('ft_p_to_c', 1),
('max_pdu_c_to_p', 2),
('max_pdu_p_to_c', 2),
('iso_interval', 2),
]
)
class HCI_LE_CIS_Established_Event(HCI_LE_Meta_Event):
'''
See Bluetooth spec @ 7.7.65.25 LE CIS Established Event
'''
# -----------------------------------------------------------------------------
@HCI_LE_Meta_Event.event(
[
('acl_connection_handle', 2),
('cis_connection_handle', 2),
('cig_id', 1),
('cis_id', 1),
]
)
class HCI_LE_CIS_Request_Event(HCI_LE_Meta_Event):
'''
See Bluetooth spec @ 7.7.65.26 LE CIS Request Event
'''
# -----------------------------------------------------------------------------
@HCI_Event.event([('status', STATUS_SPEC)])
class HCI_Inquiry_Complete_Event(HCI_Event):
@@ -5336,10 +5068,6 @@ class HCI_Disconnection_Complete_Event(HCI_Event):
See Bluetooth spec @ 7.7.5 Disconnection Complete Event
'''
status: int
connection_handle: int
reason: int
# -----------------------------------------------------------------------------
@HCI_Event.event([('status', STATUS_SPEC), ('connection_handle', 2)])
@@ -6010,168 +5738,6 @@ class HCI_AclDataPacket(HCI_Packet):
)
# -----------------------------------------------------------------------------
class HCI_SynchronousDataPacket(HCI_Packet):
'''
See Bluetooth spec @ 5.4.3 HCI SCO Data Packets
'''
hci_packet_type = HCI_SYNCHRONOUS_DATA_PACKET
@staticmethod
def from_bytes(packet: bytes) -> HCI_SynchronousDataPacket:
# Read the header
h, data_total_length = struct.unpack_from('<HB', packet, 1)
connection_handle = h & 0xFFF
packet_status = (h >> 12) & 0b11
data = packet[4:]
if len(data) != data_total_length:
raise ValueError(
f'invalid packet length {len(data)} != {data_total_length}'
)
return HCI_SynchronousDataPacket(
connection_handle, packet_status, data_total_length, data
)
def to_bytes(self) -> bytes:
h = (self.packet_status << 12) | self.connection_handle
return (
struct.pack('<BHB', HCI_SYNCHRONOUS_DATA_PACKET, h, self.data_total_length)
+ self.data
)
def __init__(
self,
connection_handle: int,
packet_status: int,
data_total_length: int,
data: bytes,
) -> None:
self.connection_handle = connection_handle
self.packet_status = packet_status
self.data_total_length = data_total_length
self.data = data
def __bytes__(self) -> bytes:
return self.to_bytes()
def __str__(self) -> str:
return (
f'{color("SCO", "blue")}: '
f'handle=0x{self.connection_handle:04x}, '
f'ps={self.packet_status}, '
f'data_total_length={self.data_total_length}, '
f'data={self.data.hex()}'
)
# -----------------------------------------------------------------------------
class HCI_IsoDataPacket(HCI_Packet):
'''
See Bluetooth spec @ 5.4.5 HCI ISO Data Packets
'''
hci_packet_type = HCI_ISO_DATA_PACKET
@staticmethod
def from_bytes(packet: bytes) -> HCI_IsoDataPacket:
time_stamp: Optional[int] = None
packet_sequence_number: Optional[int] = None
iso_sdu_length: Optional[int] = None
packet_status_flag: Optional[int] = None
pos = 1
pdu_info, data_total_length = struct.unpack_from('<HH', packet, pos)
connection_handle = pdu_info & 0xFFF
pb_flag = (pdu_info >> 12) & 0b11
ts_flag = (pdu_info >> 14) & 0b01
pos += 4
# pb_flag in (0b00, 0b10) but faster
should_include_sdu_info = not (pb_flag & 0b01)
if ts_flag:
if not should_include_sdu_info:
logger.warn(f'Timestamp included when pb_flag={bin(pb_flag)}')
time_stamp, _ = struct.unpack_from('<I', packet, pos)
pos += 4
if should_include_sdu_info:
packet_sequence_number, sdu_info = struct.unpack_from('<HH', packet, pos)
iso_sdu_length = sdu_info & 0xFFF
packet_status_flag = sdu_info >> 14
pos += 4
iso_sdu_fragment = packet[pos:]
return HCI_IsoDataPacket(
connection_handle=connection_handle,
pb_flag=pb_flag,
ts_flag=ts_flag,
data_total_length=data_total_length,
time_stamp=time_stamp,
packet_sequence_number=packet_sequence_number,
iso_sdu_length=iso_sdu_length,
packet_status_flag=packet_status_flag,
iso_sdu_fragment=iso_sdu_fragment,
)
def __init__(
self,
connection_handle: int,
pb_flag: int,
ts_flag: int,
data_total_length: int,
time_stamp: Optional[int],
packet_sequence_number: Optional[int],
iso_sdu_length: Optional[int],
packet_status_flag: Optional[int],
iso_sdu_fragment: bytes,
) -> None:
self.connection_handle = connection_handle
self.pb_flag = pb_flag
self.ts_flag = ts_flag
self.data_total_length = data_total_length
self.time_stamp = time_stamp
self.packet_sequence_number = packet_sequence_number
self.iso_sdu_length = iso_sdu_length
self.packet_status_flag = packet_status_flag
self.iso_sdu_fragment = iso_sdu_fragment
def __bytes__(self) -> bytes:
return self.to_bytes()
def to_bytes(self) -> bytes:
fmt = '<BHH'
args = [
HCI_ISO_DATA_PACKET,
self.ts_flag << 14 | self.pb_flag << 12 | self.connection_handle,
self.data_total_length,
]
if self.time_stamp is not None:
fmt += 'I'
args.append(self.time_stamp)
if (
self.packet_sequence_number is not None
and self.iso_sdu_length is not None
and self.packet_status_flag is not None
):
fmt += 'HH'
args += [
self.packet_sequence_number,
self.iso_sdu_length | self.packet_status_flag << 14,
]
return struct.pack(fmt, args) + self.iso_sdu_fragment
def __str__(self) -> str:
return (
f'{color("ISO", "blue")}: '
f'handle=0x{self.connection_handle:04x}, '
f'ps={self.packet_status_flag}, '
f'data_total_length={self.data_total_length}, '
f'sdu={self.iso_sdu_fragment.hex()}'
)
# -----------------------------------------------------------------------------
class HCI_AclDataPacketAssembler:
current_data: Optional[bytes]
+42 -67
View File
@@ -15,39 +15,30 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
from collections.abc import Callable, MutableMapping
from typing import cast, Any
import logging
from bumble import avdtp
from bumble.colors import color
from bumble.att import ATT_CID, ATT_PDU
from bumble.smp import SMP_CID, SMP_Command
from bumble.core import name_or_number
from bumble.l2cap import (
from .colors import color
from .att import ATT_CID, ATT_PDU
from .smp import SMP_CID, SMP_Command
from .core import name_or_number
from .l2cap import (
L2CAP_PDU,
L2CAP_CONNECTION_REQUEST,
L2CAP_CONNECTION_RESPONSE,
L2CAP_SIGNALING_CID,
L2CAP_LE_SIGNALING_CID,
L2CAP_Control_Frame,
L2CAP_Connection_Request,
L2CAP_Connection_Response,
)
from bumble.hci import (
from .hci import (
HCI_EVENT_PACKET,
HCI_ACL_DATA_PACKET,
HCI_DISCONNECTION_COMPLETE_EVENT,
HCI_AclDataPacketAssembler,
HCI_Packet,
HCI_Event,
HCI_AclDataPacket,
HCI_Disconnection_Complete_Event,
)
from bumble.rfcomm import RFCOMM_Frame, RFCOMM_PSM
from bumble.sdp import SDP_PDU, SDP_PSM
from .rfcomm import RFCOMM_Frame, RFCOMM_PSM
from .sdp import SDP_PDU, SDP_PSM
from .avdtp import MessageAssembler as AVDTP_MessageAssembler, AVDTP_PSM
# -----------------------------------------------------------------------------
# Logging
@@ -59,25 +50,23 @@ logger = logging.getLogger(__name__)
PSM_NAMES = {
RFCOMM_PSM: 'RFCOMM',
SDP_PSM: 'SDP',
avdtp.AVDTP_PSM: 'AVDTP',
AVDTP_PSM: 'AVDTP'
# TODO: add more PSM values
}
# -----------------------------------------------------------------------------
class PacketTracer:
class AclStream:
psms: MutableMapping[int, int]
peer: PacketTracer.AclStream
avdtp_assemblers: MutableMapping[int, avdtp.MessageAssembler]
def __init__(self, analyzer: PacketTracer.Analyzer) -> None:
def __init__(self, analyzer):
self.analyzer = analyzer
self.packet_assembler = HCI_AclDataPacketAssembler(self.on_acl_pdu)
self.avdtp_assemblers = {} # AVDTP assemblers, by source_cid
self.psms = {} # PSM, by source_cid
self.peer = None # ACL stream in the other direction
# pylint: disable=too-many-nested-blocks
def on_acl_pdu(self, pdu: bytes) -> None:
def on_acl_pdu(self, pdu):
l2cap_pdu = L2CAP_PDU.from_bytes(pdu)
if l2cap_pdu.cid == ATT_CID:
@@ -92,30 +81,26 @@ class PacketTracer:
# Check if this signals a new channel
if control_frame.code == L2CAP_CONNECTION_REQUEST:
connection_request = cast(L2CAP_Connection_Request, control_frame)
self.psms[connection_request.source_cid] = connection_request.psm
self.psms[control_frame.source_cid] = control_frame.psm
elif control_frame.code == L2CAP_CONNECTION_RESPONSE:
connection_response = cast(L2CAP_Connection_Response, control_frame)
if (
connection_response.result
control_frame.result
== L2CAP_Connection_Response.CONNECTION_SUCCESSFUL
):
if self.peer:
if psm := self.peer.psms.get(
connection_response.source_cid
):
if psm := self.peer.psms.get(control_frame.source_cid):
# Found a pending connection
self.psms[connection_response.destination_cid] = psm
self.psms[control_frame.destination_cid] = psm
# For AVDTP connections, create a packet assembler for
# each direction
if psm == avdtp.AVDTP_PSM:
if psm == AVDTP_PSM:
self.avdtp_assemblers[
connection_response.source_cid
] = avdtp.MessageAssembler(self.on_avdtp_message)
control_frame.source_cid
] = AVDTP_MessageAssembler(self.on_avdtp_message)
self.peer.avdtp_assemblers[
connection_response.destination_cid
] = avdtp.MessageAssembler(
control_frame.destination_cid
] = AVDTP_MessageAssembler(
self.peer.on_avdtp_message
)
@@ -128,7 +113,7 @@ class PacketTracer:
elif psm == RFCOMM_PSM:
rfcomm_frame = RFCOMM_Frame.from_bytes(l2cap_pdu.payload)
self.analyzer.emit(rfcomm_frame)
elif psm == avdtp.AVDTP_PSM:
elif psm == AVDTP_PSM:
self.analyzer.emit(
f'{color("L2CAP", "green")} [CID={l2cap_pdu.cid}, '
f'PSM=AVDTP]: {l2cap_pdu.payload.hex()}'
@@ -145,26 +130,22 @@ class PacketTracer:
else:
self.analyzer.emit(l2cap_pdu)
def on_avdtp_message(
self, transaction_label: int, message: avdtp.Message
) -> None:
def on_avdtp_message(self, transaction_label, message):
self.analyzer.emit(
f'{color("AVDTP", "green")} [{transaction_label}] {message}'
)
def feed_packet(self, packet: HCI_AclDataPacket) -> None:
def feed_packet(self, packet):
self.packet_assembler.feed_packet(packet)
class Analyzer:
acl_streams: MutableMapping[int, PacketTracer.AclStream]
peer: PacketTracer.Analyzer
def __init__(self, label: str, emit_message: Callable[..., None]) -> None:
def __init__(self, label, emit_message):
self.label = label
self.emit_message = emit_message
self.acl_streams = {} # ACL streams, by connection handle
self.peer = None # Analyzer in the other direction
def start_acl_stream(self, connection_handle: int) -> PacketTracer.AclStream:
def start_acl_stream(self, connection_handle):
logger.info(
f'[{self.label}] +++ Creating ACL stream for connection '
f'0x{connection_handle:04X}'
@@ -179,7 +160,7 @@ class PacketTracer:
return stream
def end_acl_stream(self, connection_handle: int) -> None:
def end_acl_stream(self, connection_handle):
if connection_handle in self.acl_streams:
logger.info(
f'[{self.label}] --- Removing ACL stream for connection '
@@ -190,29 +171,23 @@ class PacketTracer:
# Let the other forwarder know so it can cleanup its stream as well
self.peer.end_acl_stream(connection_handle)
def on_packet(self, packet: HCI_Packet) -> None:
def on_packet(self, packet):
self.emit(packet)
if packet.hci_packet_type == HCI_ACL_DATA_PACKET:
acl_packet = cast(HCI_AclDataPacket, packet)
# Look for an existing stream for this handle, create one if it is the
# first ACL packet for that connection handle
if (
stream := self.acl_streams.get(acl_packet.connection_handle)
) is None:
stream = self.start_acl_stream(acl_packet.connection_handle)
stream.feed_packet(acl_packet)
if (stream := self.acl_streams.get(packet.connection_handle)) is None:
stream = self.start_acl_stream(packet.connection_handle)
stream.feed_packet(packet)
elif packet.hci_packet_type == HCI_EVENT_PACKET:
event_packet = cast(HCI_Event, packet)
if event_packet.event_code == HCI_DISCONNECTION_COMPLETE_EVENT:
self.end_acl_stream(
cast(HCI_Disconnection_Complete_Event, packet).connection_handle
)
if packet.event_code == HCI_DISCONNECTION_COMPLETE_EVENT:
self.end_acl_stream(packet.connection_handle)
def emit(self, message: Any) -> None:
def emit(self, message):
self.emit_message(f'[{self.label}] {message}')
def trace(self, packet: HCI_Packet, direction: int = 0) -> None:
def trace(self, packet, direction=0):
if direction == 0:
self.host_to_controller_analyzer.on_packet(packet)
else:
@@ -220,10 +195,10 @@ class PacketTracer:
def __init__(
self,
host_to_controller_label: str = color('HOST->CONTROLLER', 'blue'),
controller_to_host_label: str = color('CONTROLLER->HOST', 'cyan'),
emit_message: Callable[..., None] = logger.info,
) -> None:
host_to_controller_label=color('HOST->CONTROLLER', 'blue'),
controller_to_host_label=color('CONTROLLER->HOST', 'cyan'),
emit_message=logger.info,
):
self.host_to_controller_analyzer = PacketTracer.Analyzer(
host_to_controller_label, emit_message
)
+1 -179
View File
@@ -22,7 +22,7 @@ import dataclasses
import enum
import traceback
import warnings
from typing import Dict, List, Union, Set, Any, TYPE_CHECKING
from typing import Dict, List, Union, Set, TYPE_CHECKING
from . import at
from . import rfcomm
@@ -35,11 +35,6 @@ from bumble.core import (
BT_L2CAP_PROTOCOL_ID,
BT_RFCOMM_PROTOCOL_ID,
)
from bumble.hci import (
HCI_Enhanced_Setup_Synchronous_Connection_Command,
CodingFormat,
CodecID,
)
from bumble.sdp import (
DataElement,
ServiceAttribute,
@@ -70,7 +65,6 @@ class HfpProtocolError(ProtocolError):
# Protocol Support
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
class HfpProtocol:
dlc: rfcomm.DLC
@@ -825,175 +819,3 @@ def sdp_records(
DataElement.unsigned_integer_16(hf_supported_features),
),
]
# -----------------------------------------------------------------------------
# ESCO Codec Default Parameters
# -----------------------------------------------------------------------------
# Hands-Free Profile v1.8, 5.7 Codec Interoperability Requirements
class DefaultCodecParameters(enum.IntEnum):
SCO_CVSD_D0 = enum.auto()
SCO_CVSD_D1 = enum.auto()
ESCO_CVSD_S1 = enum.auto()
ESCO_CVSD_S2 = enum.auto()
ESCO_CVSD_S3 = enum.auto()
ESCO_CVSD_S4 = enum.auto()
ESCO_MSBC_T1 = enum.auto()
ESCO_MSBC_T2 = enum.auto()
@dataclasses.dataclass
class EscoParameters:
# Codec specific
transmit_coding_format: CodingFormat
receive_coding_format: CodingFormat
packet_type: HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType
retransmission_effort: HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort
max_latency: int
# Common
input_coding_format: CodingFormat = CodingFormat(CodecID.LINEAR_PCM)
output_coding_format: CodingFormat = CodingFormat(CodecID.LINEAR_PCM)
input_coded_data_size: int = 16
output_coded_data_size: int = 16
input_pcm_data_format: HCI_Enhanced_Setup_Synchronous_Connection_Command.PcmDataFormat = (
HCI_Enhanced_Setup_Synchronous_Connection_Command.PcmDataFormat.TWOS_COMPLEMENT
)
output_pcm_data_format: HCI_Enhanced_Setup_Synchronous_Connection_Command.PcmDataFormat = (
HCI_Enhanced_Setup_Synchronous_Connection_Command.PcmDataFormat.TWOS_COMPLEMENT
)
input_pcm_sample_payload_msb_position: int = 0
output_pcm_sample_payload_msb_position: int = 0
input_data_path: HCI_Enhanced_Setup_Synchronous_Connection_Command.DataPath = (
HCI_Enhanced_Setup_Synchronous_Connection_Command.DataPath.HCI
)
output_data_path: HCI_Enhanced_Setup_Synchronous_Connection_Command.DataPath = (
HCI_Enhanced_Setup_Synchronous_Connection_Command.DataPath.HCI
)
input_transport_unit_size: int = 0
output_transport_unit_size: int = 0
input_bandwidth: int = 16000
output_bandwidth: int = 16000
transmit_bandwidth: int = 8000
receive_bandwidth: int = 8000
transmit_codec_frame_size: int = 60
receive_codec_frame_size: int = 60
def asdict(self) -> Dict[str, Any]:
# dataclasses.asdict() will recursively deep-copy the entire object,
# which is expensive and breaks CodingFormat object, so let it simply copy here.
return self.__dict__
_ESCO_PARAMETERS_CVSD_D0 = EscoParameters(
transmit_coding_format=CodingFormat(CodecID.CVSD),
receive_coding_format=CodingFormat(CodecID.CVSD),
max_latency=0xFFFF,
packet_type=HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.HV1,
retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.NO_RETRANSMISSION,
)
_ESCO_PARAMETERS_CVSD_D1 = EscoParameters(
transmit_coding_format=CodingFormat(CodecID.CVSD),
receive_coding_format=CodingFormat(CodecID.CVSD),
max_latency=0xFFFF,
packet_type=HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.HV3,
retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.NO_RETRANSMISSION,
)
_ESCO_PARAMETERS_CVSD_S1 = EscoParameters(
transmit_coding_format=CodingFormat(CodecID.CVSD),
receive_coding_format=CodingFormat(CodecID.CVSD),
max_latency=0x0007,
packet_type=(
HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV3
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV3
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV5
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV5
),
retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.OPTIMIZE_FOR_POWER,
)
_ESCO_PARAMETERS_CVSD_S2 = EscoParameters(
transmit_coding_format=CodingFormat(CodecID.CVSD),
receive_coding_format=CodingFormat(CodecID.CVSD),
max_latency=0x0007,
packet_type=(
HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV3
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV5
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV5
),
retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.OPTIMIZE_FOR_POWER,
)
_ESCO_PARAMETERS_CVSD_S3 = EscoParameters(
transmit_coding_format=CodingFormat(CodecID.CVSD),
receive_coding_format=CodingFormat(CodecID.CVSD),
max_latency=0x000A,
packet_type=(
HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV3
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV5
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV5
),
retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.OPTIMIZE_FOR_POWER,
)
_ESCO_PARAMETERS_CVSD_S4 = EscoParameters(
transmit_coding_format=CodingFormat(CodecID.CVSD),
receive_coding_format=CodingFormat(CodecID.CVSD),
max_latency=0x000C,
packet_type=(
HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV3
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV5
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV5
),
retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.OPTIMIZE_FOR_QUALITY,
)
_ESCO_PARAMETERS_MSBC_T1 = EscoParameters(
transmit_coding_format=CodingFormat(CodecID.MSBC),
receive_coding_format=CodingFormat(CodecID.MSBC),
max_latency=0x0008,
packet_type=(
HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV3
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV5
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV5
),
input_bandwidth=32000,
output_bandwidth=32000,
retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.OPTIMIZE_FOR_QUALITY,
)
_ESCO_PARAMETERS_MSBC_T2 = EscoParameters(
transmit_coding_format=CodingFormat(CodecID.MSBC),
receive_coding_format=CodingFormat(CodecID.MSBC),
max_latency=0x000D,
packet_type=(
HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV3
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV3
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV5
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV5
),
input_bandwidth=32000,
output_bandwidth=32000,
retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.OPTIMIZE_FOR_QUALITY,
)
ESCO_PARAMETERS = {
DefaultCodecParameters.SCO_CVSD_D0: _ESCO_PARAMETERS_CVSD_D0,
DefaultCodecParameters.SCO_CVSD_D1: _ESCO_PARAMETERS_CVSD_D1,
DefaultCodecParameters.ESCO_CVSD_S1: _ESCO_PARAMETERS_CVSD_S1,
DefaultCodecParameters.ESCO_CVSD_S2: _ESCO_PARAMETERS_CVSD_S2,
DefaultCodecParameters.ESCO_CVSD_S3: _ESCO_PARAMETERS_CVSD_S3,
DefaultCodecParameters.ESCO_CVSD_S4: _ESCO_PARAMETERS_CVSD_S4,
DefaultCodecParameters.ESCO_MSBC_T1: _ESCO_PARAMETERS_MSBC_T1,
DefaultCodecParameters.ESCO_MSBC_T2: _ESCO_PARAMETERS_MSBC_T2,
}
+7 -8
View File
@@ -18,14 +18,15 @@
from __future__ import annotations
from dataclasses import dataclass
import logging
import asyncio
import enum
from pyee import EventEmitter
from typing import Optional, TYPE_CHECKING
from typing import Optional, Tuple, Callable, Dict, Union, TYPE_CHECKING
from bumble import l2cap
from bumble.colors import color
from bumble.core import InvalidStateError, ProtocolError
from . import core, l2cap # type: ignore
from .colors import color # type: ignore
from .core import BT_BR_EDR_TRANSPORT, InvalidStateError, ProtocolError # type: ignore
if TYPE_CHECKING:
from bumble.device import Device, Connection
@@ -301,12 +302,10 @@ class Host(EventEmitter):
self.send_pdu_on_ctrl(hid_message)
def send_pdu_on_ctrl(self, msg: bytes) -> None:
assert self.l2cap_ctrl_channel
self.l2cap_ctrl_channel.send_pdu(msg)
self.l2cap_ctrl_channel.send_pdu(msg) # type: ignore
def send_pdu_on_intr(self, msg: bytes) -> None:
assert self.l2cap_intr_channel
self.l2cap_intr_channel.send_pdu(msg)
self.l2cap_intr_channel.send_pdu(msg) # type: ignore
def send_data(self, data):
msg = SendData(data)
+11 -61
View File
@@ -21,7 +21,7 @@ import collections
import logging
import struct
from typing import Optional, TYPE_CHECKING, Dict, Callable, Awaitable, cast
from typing import Optional, TYPE_CHECKING, Dict, Callable, Awaitable
from bumble.colors import color
from bumble.l2cap import L2CAP_PDU
@@ -32,8 +32,8 @@ from .hci import (
Address,
HCI_ACL_DATA_PACKET,
HCI_COMMAND_PACKET,
HCI_COMMAND_COMPLETE_EVENT,
HCI_EVENT_PACKET,
HCI_ISO_DATA_PACKET,
HCI_LE_READ_BUFFER_SIZE_COMMAND,
HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND,
HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND,
@@ -43,7 +43,6 @@ from .hci import (
HCI_RESET_COMMAND,
HCI_SUCCESS,
HCI_SUPPORTED_COMMANDS_FLAGS,
HCI_SYNCHRONOUS_DATA_PACKET,
HCI_VERSION_BLUETOOTH_CORE_4_0,
HCI_AclDataPacket,
HCI_AclDataPacketAssembler,
@@ -52,7 +51,6 @@ from .hci import (
HCI_Constant,
HCI_Error,
HCI_Event,
HCI_IsoDataPacket,
HCI_LE_Long_Term_Key_Request_Negative_Reply_Command,
HCI_LE_Long_Term_Key_Request_Reply_Command,
HCI_LE_Read_Buffer_Size_Command,
@@ -69,13 +67,13 @@ from .hci import (
HCI_Read_Local_Version_Information_Command,
HCI_Reset_Command,
HCI_Set_Event_Mask_Command,
HCI_SynchronousDataPacket,
)
from .core import (
BT_BR_EDR_TRANSPORT,
BT_LE_TRANSPORT,
ConnectionPHY,
ConnectionParameters,
InvalidStateError,
)
from .utils import AbortableEventEmitter
from .transport.common import TransportLostError
@@ -243,7 +241,7 @@ class Host(AbortableEventEmitter):
# understand
le_event_mask = bytes.fromhex('1F00000000000000')
else:
le_event_mask = bytes.fromhex('FFFFFFFF00000000')
le_event_mask = bytes.fromhex('FFFFF00000000000')
await self.send_command(
HCI_LE_Set_Event_Mask_Command(le_event_mask=le_event_mask)
@@ -487,16 +485,12 @@ class Host(AbortableEventEmitter):
self.snooper.snoop(bytes(packet), Snooper.Direction.CONTROLLER_TO_HOST)
# If the packet is a command, invoke the handler for this packet
if packet.hci_packet_type == HCI_COMMAND_PACKET:
self.on_hci_command_packet(cast(HCI_Command, packet))
elif packet.hci_packet_type == HCI_EVENT_PACKET:
self.on_hci_event_packet(cast(HCI_Event, packet))
elif packet.hci_packet_type == HCI_ACL_DATA_PACKET:
self.on_hci_acl_data_packet(cast(HCI_AclDataPacket, packet))
elif packet.hci_packet_type == HCI_SYNCHRONOUS_DATA_PACKET:
self.on_hci_sco_data_packet(cast(HCI_SynchronousDataPacket, packet))
elif packet.hci_packet_type == HCI_ISO_DATA_PACKET:
self.on_hci_iso_data_packet(cast(HCI_IsoDataPacket, packet))
if isinstance(packet, HCI_Command):
self.on_hci_command_packet(packet)
elif isinstance(packet, HCI_Event):
self.on_hci_event_packet(packet)
elif isinstance(packet, HCI_AclDataPacket):
self.on_hci_acl_data_packet(packet)
else:
logger.warning(f'!!! unknown packet type {packet.hci_packet_type}')
@@ -513,14 +507,6 @@ class Host(AbortableEventEmitter):
if connection := self.connections.get(packet.connection_handle):
connection.on_hci_acl_data_packet(packet)
def on_hci_sco_data_packet(self, packet: HCI_SynchronousDataPacket) -> None:
# Experimental
self.emit('sco_packet', packet.connection_handle, packet)
def on_hci_iso_data_packet(self, packet: HCI_IsoDataPacket) -> None:
# Experimental
self.emit('iso_packet', packet.connection_handle, packet)
def on_l2cap_pdu(self, connection: Connection, cid: int, pdu: bytes) -> None:
self.emit('l2cap_pdu', connection.handle, cid, pdu)
@@ -721,24 +707,6 @@ class Host(AbortableEventEmitter):
def on_hci_le_extended_advertising_report_event(self, event):
self.on_hci_le_advertising_report_event(event)
def on_hci_le_cis_request_event(self, event):
self.emit(
'cis_request',
event.acl_connection_handle,
event.cis_connection_handle,
event.cig_id,
event.cis_id,
)
def on_hci_le_cis_established_event(self, event):
# The remaining parameters are unused for now.
if event.status == HCI_SUCCESS:
self.emit('cis_establishment', event.connection_handle)
else:
self.emit(
'cis_establishment_failure', event.connection_handle, event.status
)
def on_hci_le_remote_connection_parameter_request_event(self, event):
if event.connection_handle not in self.connections:
logger.warning('!!! REMOTE CONNECTION PARAMETER REQUEST: unknown handle')
@@ -792,25 +760,7 @@ class Host(AbortableEventEmitter):
asyncio.create_task(send_long_term_key())
def on_hci_synchronous_connection_complete_event(self, event):
if event.status == HCI_SUCCESS:
# Create/update the connection
logger.debug(
f'### SCO CONNECTION: [0x{event.connection_handle:04X}] '
f'{event.bd_addr}'
)
# Notify the client
self.emit(
'sco_connection',
event.bd_addr,
event.connection_handle,
event.link_type,
)
else:
logger.debug(f'### SCO CONNECTION FAILED: {event.status}')
# Notify the client
self.emit('sco_connection_failure', event.bd_addr, event.status)
pass
def on_hci_synchronous_connection_changed_event(self, event):
pass
+386 -214
View File
@@ -35,8 +35,10 @@ from typing import (
Union,
Deque,
Iterable,
Set,
SupportsBytes,
TYPE_CHECKING,
overload,
)
from .utils import deprecated
@@ -237,6 +239,8 @@ class L2CAP_Control_Frame:
classes: Dict[int, Type[L2CAP_Control_Frame]] = {}
code = 0
name: str
identifier: int
pdu: bytes
@staticmethod
def from_bytes(pdu: bytes) -> L2CAP_Control_Frame:
@@ -435,11 +439,6 @@ class L2CAP_Connection_Response(L2CAP_Control_Frame):
See Bluetooth spec @ Vol 3, Part A - 4.3 CONNECTION RESPONSE
'''
source_cid: int
destination_cid: int
status: int
result: int
CONNECTION_SUCCESSFUL = 0x0000
CONNECTION_PENDING = 0x0001
CONNECTION_REFUSED_PSM_NOT_SUPPORTED = 0x0002
@@ -645,7 +644,11 @@ class L2CAP_LE_Credit_Based_Connection_Request(L2CAP_Control_Frame):
(CODE 0x14)
'''
le_psm: int
source_cid: int
mtu: int
mps: int
initial_credits: int
# -----------------------------------------------------------------------------
@@ -1383,19 +1386,14 @@ class LeCreditBasedChannel(EventEmitter):
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class ClassicChannelServer(EventEmitter):
def __init__(
self,
manager: ChannelManager,
psm: int,
handler: Optional[Callable[[ClassicChannel], Any]],
mtu: int,
) -> None:
_close_closure: Callable[[], None]
psm: int
handler: Optional[Callable[[ClassicChannel], Any]]
def __post_init__(self) -> None:
super().__init__()
self.manager = manager
self.handler = handler
self.psm = psm
self.mtu = mtu
def on_connection(self, channel: ClassicChannel) -> None:
self.emit('connection', channel)
@@ -1403,28 +1401,18 @@ class ClassicChannelServer(EventEmitter):
self.handler(channel)
def close(self) -> None:
if self.psm in self.manager.servers:
del self.manager.servers[self.psm]
self._close_closure()
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class LeCreditBasedChannelServer(EventEmitter):
def __init__(
self,
manager: ChannelManager,
psm: int,
handler: Optional[Callable[[LeCreditBasedChannel], Any]],
max_credits: int,
mtu: int,
mps: int,
) -> None:
_close_closure: Callable[[], None]
psm: int
handler: Optional[Callable[[LeCreditBasedChannel], Any]]
def __post_init__(self) -> None:
super().__init__()
self.manager = manager
self.handler = handler
self.psm = psm
self.max_credits = max_credits
self.mtu = mtu
self.mps = mps
def on_connection(self, channel: LeCreditBasedChannel) -> None:
self.emit('connection', channel)
@@ -1432,21 +1420,107 @@ class LeCreditBasedChannelServer(EventEmitter):
self.handler(channel)
def close(self) -> None:
if self.psm in self.manager.le_coc_servers:
del self.manager.le_coc_servers[self.psm]
self._close_closure()
# -----------------------------------------------------------------------------
class PendingConnection:
"""
All pending connection types.
A `PendingConnection` is a temporary object used to accept an incoming connection
request, it contains the acceptor channel configuration preferences and transition
to the connected state through the `on_connection` callback.
This object is not supposed to live anymore once the channel is connected.
"""
class Any:
"""L2CAP any channel pending connection."""
on_connection: Callable[[Any], None]
mtu: int
@dataclasses.dataclass
class Basic(Any):
"""L2CAP basic channel pending connection."""
on_connection: Callable[[ClassicChannel], None] = lambda _: None
mtu: int = L2CAP_MIN_BR_EDR_MTU
@dataclasses.dataclass
class LeCreditBased(Any):
"""L2CAP LE credit based channel pending connection."""
on_connection: Callable[[LeCreditBasedChannel], None] = lambda _: None
mtu: int = L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU
mps: int = L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS
max_credits: int = L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_INITIAL_CREDITS
# -----------------------------------------------------------------------------
class IncomingConnection:
"""
All incoming connection types.
A `IncomingConnection` is a temporary object used to notify listeners of an
incoming channel connection request. It can accepted through the `future` field.
Multiple listeners can observe the same incoming connection request, but no more
than one can actually accept, first come first served. Thus it's recommended for
delayed accept to before check the state of the future field.
This object is not supposed to live anymore once accepted.
Example:
```python
fut = asyncio.Future()
def listener(incoming: IncomingConnection.Any) -> None:
if isinstance(incoming, IncomingConnection.Basic) and incoming.psm == 0xcafe:
incoming.future.set_result(PendingConnection.Basic(fut.set_result, mtu=123))
device.l2cap_manager.listen(listener)
channel = await fut
```
"""
@dataclasses.dataclass
class Any:
"""L2CAP any incoming channel connection request."""
connection: Connection
psm: int
source_cid: int
def __post_init__(self) -> None:
self.future: asyncio.Future[Any] = asyncio.Future()
@dataclasses.dataclass
class Basic(Any):
"""L2CAP incoming basic channel connection request."""
future: asyncio.Future[PendingConnection.Basic] = dataclasses.field(init=False)
@dataclasses.dataclass
class LeCreditBased(Any):
"""L2CAP incoming LE credit based channel connection request."""
mtu: int
mps: int
initial_credits: int
future: asyncio.Future[PendingConnection.LeCreditBased] = dataclasses.field(
init=False
)
# -----------------------------------------------------------------------------
class ChannelManager:
identifiers: Dict[int, int]
channels: Dict[int, Dict[int, Union[ClassicChannel, LeCreditBasedChannel]]]
servers: Dict[int, ClassicChannelServer]
le_coc_channels: Dict[int, Dict[int, LeCreditBasedChannel]]
le_coc_servers: Dict[int, LeCreditBasedChannelServer]
le_coc_requests: Dict[int, L2CAP_LE_Credit_Based_Connection_Request]
fixed_channels: Dict[int, Optional[Callable[[int, bytes], Any]]]
_host: Optional[Host]
connection_parameters_update_response: Optional[asyncio.Future[int]]
listeners: List[Callable[[IncomingConnection.Any], None]]
used_psm: Set[int]
def __init__(
self,
@@ -1460,15 +1534,15 @@ class ChannelManager:
L2CAP_SIGNALING_CID: None,
L2CAP_LE_SIGNALING_CID: None,
}
self.servers = {} # Servers accepting connections, by PSM
self.le_coc_channels = (
{}
) # LE CoC channels, mapped by connection and destination cid
self.le_coc_servers = {} # LE CoC - Servers accepting connections, by PSM
self.le_coc_requests = {} # LE CoC connection requests, by identifier
self.extended_features = extended_features
self.connectionless_mtu = connectionless_mtu
self.connection_parameters_update_response = None
self.listeners = []
self.used_psm = set()
@property
def host(self) -> Host:
@@ -1521,6 +1595,31 @@ class ChannelManager:
raise RuntimeError('no free CID')
def allocate_psm(self) -> int:
# Find a free PSM
for candidate in range(
L2CAP_PSM_DYNAMIC_RANGE_START, L2CAP_PSM_DYNAMIC_RANGE_END + 1, 2
):
if (candidate >> 8) % 2 == 1:
continue
if candidate in self.used_psm:
continue
return candidate
raise InvalidStateError('no free PSM')
def allocate_spsm(self) -> int:
# Find a free sPSM
for candidate in range(
L2CAP_LE_PSM_DYNAMIC_RANGE_START, L2CAP_LE_PSM_DYNAMIC_RANGE_END + 1
):
if candidate in self.used_psm:
continue
return candidate
raise InvalidStateError('no free PSM')
def free_psm(self, psm: int) -> None:
self.used_psm.remove(psm)
def next_identifier(self, connection: Connection) -> int:
identifier = (self.identifiers.setdefault(connection.handle, 0) + 1) % 256
self.identifiers[connection.handle] = identifier
@@ -1535,6 +1634,35 @@ class ChannelManager:
if cid in self.fixed_channels:
del self.fixed_channels[cid]
@overload
def listen(
self, cb: Callable[[IncomingConnection.Basic], None]
) -> Callable[[IncomingConnection.Basic], None]:
...
@overload
def listen(
self, cb: Callable[[IncomingConnection.LeCreditBased], None]
) -> Callable[[IncomingConnection.LeCreditBased], None]:
...
def listen(self, cb: Any) -> Any:
if cb in self.listeners:
raise ValueError('listener already registered')
self.listeners.append(cb)
return cb
@overload
def unlisten(self, cb: Callable[[IncomingConnection.Basic], None]) -> None:
...
@overload
def unlisten(self, cb: Callable[[IncomingConnection.LeCreditBased], None]) -> None:
...
def unlisten(self, cb: Any) -> None:
self.listeners.remove(cb)
@deprecated("Please use create_classic_server")
def register_server(
self,
@@ -1542,7 +1670,7 @@ class ChannelManager:
server: Callable[[ClassicChannel], Any],
) -> int:
return self.create_classic_server(
handler=server, spec=ClassicChannelSpec(psm=psm)
handler=server, spec=ClassicChannelSpec(psm=None if psm == 0 else psm)
).psm
def create_classic_server(
@@ -1550,24 +1678,12 @@ class ChannelManager:
spec: ClassicChannelSpec,
handler: Optional[Callable[[ClassicChannel], Any]] = None,
) -> ClassicChannelServer:
if not spec.psm:
# Find a free PSM
for candidate in range(
L2CAP_PSM_DYNAMIC_RANGE_START, L2CAP_PSM_DYNAMIC_RANGE_END + 1, 2
):
if (candidate >> 8) % 2 == 1:
continue
if candidate in self.servers:
continue
spec.psm = candidate
break
else:
raise InvalidStateError('no free PSM')
server: ClassicChannelServer
if spec.psm is None:
spec.psm = self.allocate_psm()
else:
# Check that the PSM isn't already in use
if spec.psm in self.servers:
raise ValueError('PSM already in use')
if spec.psm is self.used_psm:
raise ValueError(f'{spec.psm}: PSM already in use')
# Check that the PSM is valid
if spec.psm % 2 == 0:
raise ValueError('invalid PSM (not odd)')
@@ -1576,10 +1692,22 @@ class ChannelManager:
if check % 2 != 0:
raise ValueError('invalid PSM')
check >>= 8
self.used_psm.add(spec.psm)
self.servers[spec.psm] = ClassicChannelServer(self, spec.psm, handler, spec.mtu)
def listener(incoming: IncomingConnection.Basic) -> None:
if incoming.psm == spec.psm:
incoming.future.set_result(
PendingConnection.Basic(server.on_connection, spec.mtu)
)
return self.servers[spec.psm]
def close() -> None:
self.unlisten(listener)
assert spec.psm is not None
self.free_psm(spec.psm)
self.listen(listener)
server = ClassicChannelServer(close, spec.psm, handler)
return server
@deprecated("Please use create_le_credit_based_server()")
def register_le_coc_server(
@@ -1602,32 +1730,30 @@ class ChannelManager:
spec: LeCreditBasedChannelSpec,
handler: Optional[Callable[[LeCreditBasedChannel], Any]] = None,
) -> LeCreditBasedChannelServer:
if not spec.psm:
# Find a free PSM
for candidate in range(
L2CAP_LE_PSM_DYNAMIC_RANGE_START, L2CAP_LE_PSM_DYNAMIC_RANGE_END + 1
):
if candidate in self.le_coc_servers:
continue
spec.psm = candidate
break
else:
raise InvalidStateError('no free PSM')
server: LeCreditBasedChannelServer
if spec.psm is None:
spec.psm = self.allocate_psm()
else:
# Check that the PSM isn't already in use
if spec.psm in self.le_coc_servers:
raise ValueError('PSM already in use')
if spec.psm is self.used_psm:
raise ValueError(f'{spec.psm}: SPSM already in use')
self.used_psm.add(spec.psm)
self.le_coc_servers[spec.psm] = LeCreditBasedChannelServer(
self,
spec.psm,
handler,
max_credits=spec.max_credits,
mtu=spec.mtu,
mps=spec.mps,
)
def listener(incoming: IncomingConnection.LeCreditBased) -> None:
if incoming.psm == spec.psm:
incoming.future.set_result(
PendingConnection.LeCreditBased(
server.on_connection, spec.mtu, spec.mps, spec.max_credits
)
)
return self.le_coc_servers[spec.psm]
def close() -> None:
self.unlisten(listener)
assert spec.psm is not None
self.free_psm(spec.psm)
self.listen(listener)
server = LeCreditBasedChannelServer(close, spec.psm, handler)
return server
def on_disconnection(self, connection_handle: int, _reason: int) -> None:
logger.debug(f'disconnection from {connection_handle}, cleaning up channels')
@@ -1727,15 +1853,62 @@ class ChannelManager:
logger.warning(f'{color("!!! Command rejected:", "red")} {packet.reason}')
def on_l2cap_connection_request(
self, connection: Connection, cid: int, request
self, connection: Connection, cid: int, request: L2CAP_Connection_Request
) -> None:
# Check if there's a server for this PSM
server = self.servers.get(request.psm)
if server:
# Find a free CID for this new channel
connection_channels = self.channels.setdefault(connection.handle, {})
source_cid = self.find_free_br_edr_cid(connection_channels)
if source_cid is None: # Should never happen!
# Asynchronous connection request handling.
async def handle_connection_request() -> None:
incoming = IncomingConnection.Basic(
connection, request.psm, request.source_cid
)
# Dispatch incoming connection.
for listener in self.listeners:
if not incoming.future.done():
listener(incoming)
try:
pending = await asyncio.wait_for(incoming.future, timeout=3.0)
except asyncio.TimeoutError as e:
incoming.future.cancel(e)
pending = None
if pending:
# Find a free CID for this new channel
connection_channels = self.channels.setdefault(connection.handle, {})
source_cid = self.find_free_br_edr_cid(connection_channels)
if source_cid is None: # Should never happen!
self.send_control_frame(
connection,
cid,
L2CAP_Connection_Response(
identifier=request.identifier,
destination_cid=request.source_cid,
source_cid=0,
# pylint: disable=line-too-long
result=L2CAP_Connection_Response.CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE,
status=0x0000,
),
)
return
# Create a new channel
logger.debug(
f'creating server channel with cid={source_cid} for psm {request.psm}'
)
channel = ClassicChannel(
self, connection, cid, request.psm, source_cid, pending.mtu
)
connection_channels[source_cid] = channel
# Notify
pending.on_connection(channel)
channel.on_connection_request(request)
else:
logger.warning(
f'No server for connection 0x{connection.handle:04X} '
f'on PSM {request.psm}'
)
self.send_control_frame(
connection,
cid,
@@ -1744,41 +1917,13 @@ class ChannelManager:
destination_cid=request.source_cid,
source_cid=0,
# pylint: disable=line-too-long
result=L2CAP_Connection_Response.CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE,
result=L2CAP_Connection_Response.CONNECTION_REFUSED_PSM_NOT_SUPPORTED,
status=0x0000,
),
)
return
# Create a new channel
logger.debug(
f'creating server channel with cid={source_cid} for psm {request.psm}'
)
channel = ClassicChannel(
self, connection, cid, request.psm, source_cid, server.mtu
)
connection_channels[source_cid] = channel
# Notify
server.on_connection(channel)
channel.on_connection_request(request)
else:
logger.warning(
f'No server for connection 0x{connection.handle:04X} '
f'on PSM {request.psm}'
)
self.send_control_frame(
connection,
cid,
L2CAP_Connection_Response(
identifier=request.identifier,
destination_cid=request.source_cid,
source_cid=0,
# pylint: disable=line-too-long
result=L2CAP_Connection_Response.CONNECTION_REFUSED_PSM_NOT_SUPPORTED,
status=0x0000,
),
)
# Spawn connection request handling.
connection.abort_on('disconnection', handle_connection_request())
def on_l2cap_connection_response(
self, connection: Connection, cid: int, response
@@ -1979,108 +2124,135 @@ class ChannelManager:
)
def on_l2cap_le_credit_based_connection_request(
self, connection: Connection, cid: int, request
self,
connection: Connection,
cid: int,
request: L2CAP_LE_Credit_Based_Connection_Request,
) -> None:
if request.le_psm in self.le_coc_servers:
server = self.le_coc_servers[request.le_psm]
# Check that the CID isn't already used
le_connection_channels = self.le_coc_channels.setdefault(
connection.handle, {}
)
if request.source_cid in le_connection_channels:
logger.warning(f'source CID {request.source_cid} already in use')
self.send_control_frame(
connection,
cid,
L2CAP_LE_Credit_Based_Connection_Response(
identifier=request.identifier,
destination_cid=0,
mtu=server.mtu,
mps=server.mps,
initial_credits=0,
# pylint: disable=line-too-long
result=L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED,
),
)
return
# Find a free CID for this new channel
connection_channels = self.channels.setdefault(connection.handle, {})
source_cid = self.find_free_le_cid(connection_channels)
if source_cid is None: # Should never happen!
self.send_control_frame(
connection,
cid,
L2CAP_LE_Credit_Based_Connection_Response(
identifier=request.identifier,
destination_cid=0,
mtu=server.mtu,
mps=server.mps,
initial_credits=0,
# pylint: disable=line-too-long
result=L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE,
),
)
return
# Create a new channel
logger.debug(
f'creating LE CoC server channel with cid={source_cid} for psm '
f'{request.le_psm}'
)
channel = LeCreditBasedChannel(
self,
# Asynchronous connection request handling.
async def handle_connection_request() -> None:
incoming = IncomingConnection.LeCreditBased(
connection,
request.le_psm,
source_cid,
request.source_cid,
server.mtu,
server.mps,
request.initial_credits,
request.mtu,
request.mps,
server.max_credits,
True,
)
connection_channels[source_cid] = channel
le_connection_channels[request.source_cid] = channel
# Respond
self.send_control_frame(
connection,
cid,
L2CAP_LE_Credit_Based_Connection_Response(
identifier=request.identifier,
destination_cid=source_cid,
mtu=server.mtu,
mps=server.mps,
initial_credits=server.max_credits,
# pylint: disable=line-too-long
result=L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_SUCCESSFUL,
),
request.initial_credits,
)
# Notify
server.on_connection(channel)
else:
logger.info(
f'No LE server for connection 0x{connection.handle:04X} '
f'on PSM {request.le_psm}'
)
self.send_control_frame(
connection,
cid,
L2CAP_LE_Credit_Based_Connection_Response(
identifier=request.identifier,
destination_cid=0,
mtu=L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU,
mps=L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS,
initial_credits=0,
# pylint: disable=line-too-long
result=L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_REFUSED_LE_PSM_NOT_SUPPORTED,
),
)
# Dispatch incoming connection.
for listener in self.listeners:
if not incoming.future.done():
listener(incoming)
try:
pending = await asyncio.wait_for(incoming.future, timeout=3.0)
except asyncio.TimeoutError as e:
incoming.future.cancel(e)
pending = None
if pending:
# Check that the CID isn't already used
le_connection_channels = self.le_coc_channels.setdefault(
connection.handle, {}
)
if request.source_cid in le_connection_channels:
logger.warning(f'source CID {request.source_cid} already in use')
self.send_control_frame(
connection,
cid,
L2CAP_LE_Credit_Based_Connection_Response(
identifier=request.identifier,
destination_cid=0,
mtu=pending.mtu,
mps=pending.mps,
initial_credits=0,
# pylint: disable=line-too-long
result=L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED,
),
)
return
# Find a free CID for this new channel
connection_channels = self.channels.setdefault(connection.handle, {})
source_cid = self.find_free_le_cid(connection_channels)
if source_cid is None: # Should never happen!
self.send_control_frame(
connection,
cid,
L2CAP_LE_Credit_Based_Connection_Response(
identifier=request.identifier,
destination_cid=0,
mtu=pending.mtu,
mps=pending.mps,
initial_credits=0,
# pylint: disable=line-too-long
result=L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE,
),
)
return
# Create a new channel
logger.debug(
f'creating LE CoC server channel with cid={source_cid} for psm '
f'{request.le_psm}'
)
channel = LeCreditBasedChannel(
self,
connection,
request.le_psm,
source_cid,
request.source_cid,
pending.mtu,
pending.mps,
request.initial_credits,
request.mtu,
request.mps,
pending.max_credits,
True,
)
connection_channels[source_cid] = channel
le_connection_channels[request.source_cid] = channel
# Respond
self.send_control_frame(
connection,
cid,
L2CAP_LE_Credit_Based_Connection_Response(
identifier=request.identifier,
destination_cid=source_cid,
mtu=pending.mtu,
mps=pending.mps,
initial_credits=pending.max_credits,
# pylint: disable=line-too-long
result=L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_SUCCESSFUL,
),
)
# Notify
pending.on_connection(channel)
else:
logger.info(
f'No LE server for connection 0x{connection.handle:04X} '
f'on PSM {request.le_psm}'
)
self.send_control_frame(
connection,
cid,
L2CAP_LE_Credit_Based_Connection_Response(
identifier=request.identifier,
destination_cid=0,
mtu=L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU,
mps=L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS,
initial_credits=0,
# pylint: disable=line-too-long
result=L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_REFUSED_LE_PSM_NOT_SUPPORTED,
),
)
# Spawn connection request handling.
connection.abort_on('disconnection', handle_connection_request())
def on_l2cap_le_credit_based_connection_response(
self, connection: Connection, _cid: int, response
+1 -67
View File
@@ -15,9 +15,7 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import enum
from dataclasses import dataclass
from typing import Optional, Tuple
from .hci import (
@@ -37,60 +35,7 @@ from .smp import (
SMP_ID_KEY_DISTRIBUTION_FLAG,
SMP_SIGN_KEY_DISTRIBUTION_FLAG,
SMP_LINK_KEY_DISTRIBUTION_FLAG,
OobContext,
OobLegacyContext,
OobSharedData,
)
from .core import AdvertisingData, LeRole
# -----------------------------------------------------------------------------
@dataclass
class OobData:
"""OOB data that can be sent from one device to another."""
address: Optional[Address] = None
role: Optional[LeRole] = None
shared_data: Optional[OobSharedData] = None
legacy_context: Optional[OobLegacyContext] = None
@classmethod
def from_ad(cls, ad: AdvertisingData) -> OobData:
instance = cls()
shared_data_c: Optional[bytes] = None
shared_data_r: Optional[bytes] = None
for ad_type, ad_data in ad.ad_structures:
if ad_type == AdvertisingData.LE_BLUETOOTH_DEVICE_ADDRESS:
instance.address = Address(ad_data)
elif ad_type == AdvertisingData.LE_ROLE:
instance.role = LeRole(ad_data[0])
elif ad_type == AdvertisingData.LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE:
shared_data_c = ad_data
elif ad_type == AdvertisingData.LE_SECURE_CONNECTIONS_RANDOM_VALUE:
shared_data_r = ad_data
elif ad_type == AdvertisingData.SECURITY_MANAGER_TK_VALUE:
instance.legacy_context = OobLegacyContext(tk=ad_data)
if shared_data_c and shared_data_r:
instance.shared_data = OobSharedData(c=shared_data_c, r=shared_data_r)
return instance
def to_ad(self) -> AdvertisingData:
ad_structures = []
if self.address is not None:
ad_structures.append(
(AdvertisingData.LE_BLUETOOTH_DEVICE_ADDRESS, bytes(self.address))
)
if self.role is not None:
ad_structures.append((AdvertisingData.LE_ROLE, bytes([self.role])))
if self.shared_data is not None:
ad_structures.extend(self.shared_data.to_ad().ad_structures)
if self.legacy_context is not None:
ad_structures.append(
(AdvertisingData.SECURITY_MANAGER_TK_VALUE, self.legacy_context.tk)
)
return AdvertisingData(ad_structures)
# -----------------------------------------------------------------------------
@@ -228,14 +173,6 @@ class PairingConfig:
PUBLIC = Address.PUBLIC_DEVICE_ADDRESS
RANDOM = Address.RANDOM_DEVICE_ADDRESS
@dataclass
class OobConfig:
"""Config for OOB pairing."""
our_context: Optional[OobContext]
peer_data: Optional[OobSharedData]
legacy_context: Optional[OobLegacyContext]
def __init__(
self,
sc: bool = True,
@@ -243,20 +180,17 @@ class PairingConfig:
bonding: bool = True,
delegate: Optional[PairingDelegate] = None,
identity_address_type: Optional[AddressType] = None,
oob: Optional[OobConfig] = None,
) -> None:
self.sc = sc
self.mitm = mitm
self.bonding = bonding
self.delegate = delegate or PairingDelegate()
self.identity_address_type = identity_address_type
self.oob = oob
def __str__(self) -> str:
return (
f'PairingConfig(sc={self.sc}, '
f'mitm={self.mitm}, bonding={self.bonding}, '
f'identity_address_type={self.identity_address_type}, '
f'delegate[{self.delegate.io_capability}]), '
f'oob[{self.oob}])'
f'delegate[{self.delegate.io_capability}])'
)
+3
View File
@@ -26,11 +26,13 @@ from .config import Config
from .device import PandoraDevice
from .host import HostService
from .security import SecurityService, SecurityStorageService
from .l2cap import L2CAPService
from pandora.host_grpc_aio import add_HostServicer_to_server
from pandora.security_grpc_aio import (
add_SecurityServicer_to_server,
add_SecurityStorageServicer_to_server,
)
from pandora.l2cap_grpc_aio import add_L2CAPServicer_to_server
from typing import Callable, List, Optional
# public symbols
@@ -77,6 +79,7 @@ async def serve(
add_SecurityStorageServicer_to_server(
SecurityStorageService(bumble.device, config), server
)
add_L2CAPServicer_to_server(L2CAPService(bumble.device, config), server)
# call hooks if any.
for hook in _SERVICERS_HOOKS:
+289
View File
@@ -0,0 +1,289 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import dataclasses
import grpc
import struct
from bumble import device
from bumble import l2cap
from bumble.pandora import config
from bumble.pandora import utils
from bumble.utils import EventWatcher
from google.protobuf import any_pb2 # pytype: disable=pyi-error
from google.protobuf import empty_pb2 # pytype: disable=pyi-error
from pandora import l2cap_pb2
from pandora import l2cap_grpc_aio
from typing import Any, AsyncGenerator, AsyncIterator, Dict, List, Union
@dataclasses.dataclass
class ChannelProxy:
channel: Union[l2cap.ClassicChannel, l2cap.LeCreditBasedChannel, None]
def __post_init__(self) -> None:
assert self.channel
self.rx: asyncio.Queue[bytes] = asyncio.Queue()
self._disconnection_result: asyncio.Future[None] = asyncio.Future()
self.channel.sink = self.rx.put_nowait
def on_close() -> None:
assert not self._disconnection_result.done()
self.channel = None
self._disconnection_result.set_result(None)
self.channel.on('close', on_close)
def send(self, data: bytes) -> None:
assert self.channel
if isinstance(self.channel, l2cap.ClassicChannel):
self.channel.send_pdu(data)
else:
self.channel.write(data)
async def disconnect(self) -> None:
assert self.channel
await self.channel.disconnect()
async def wait_disconnect(self) -> None:
await self._disconnection_result
assert not self.channel
@dataclasses.dataclass
class ChannelIndex:
connection_handle: int
cid: int
@classmethod
def from_token(cls, token: l2cap_pb2.Channel) -> 'ChannelIndex':
connection_handle, cid = struct.unpack('>HH', token.cookie.value)
return cls(connection_handle, cid)
def into_token(self) -> l2cap_pb2.Channel:
return l2cap_pb2.Channel(
cookie=any_pb2.Any(
value=struct.pack('>HH', self.connection_handle, self.cid)
)
)
def __hash__(self):
return hash(self.connection_handle | (self.cid << 12))
class L2CAPService(l2cap_grpc_aio.L2CAPServicer):
channels: Dict[ChannelIndex, ChannelProxy] = {}
pending: List[l2cap.IncomingConnection.Any] = []
accepts: List[asyncio.Queue[l2cap.IncomingConnection.Any]] = []
def __init__(self, dev: device.Device, config: config.Config) -> None:
self.device = dev
self.config = config
def on_connection(incoming: l2cap.IncomingConnection.Any) -> None:
self.pending.append(incoming)
for acceptor in self.accepts:
acceptor.put_nowait(incoming)
# Make sure our listener is called before the builtins ones.
self.device.l2cap_channel_manager.listeners.insert(0, on_connection)
def register(self, index: ChannelIndex, proxy: ChannelProxy) -> None:
self.channels[index] = proxy
def on_close(*_: Any) -> None:
# TODO: Fix Bumble L2CAP which emit `close` event twice.
if index in self.channels:
del self.channels[index]
# Listen for disconnection.
assert proxy.channel
proxy.channel.on('close', on_close)
async def listen(self) -> AsyncIterator[l2cap.IncomingConnection.Any]:
for incoming in self.pending:
if incoming.future.done():
self.pending.remove(incoming)
continue
yield incoming
queue: asyncio.Queue[l2cap.IncomingConnection.Any] = asyncio.Queue()
self.accepts.append(queue)
try:
while incoming := await queue.get():
yield incoming
finally:
self.accepts.remove(queue)
@utils.rpc
async def Connect(
self, request: l2cap_pb2.ConnectRequest, context: grpc.ServicerContext
) -> l2cap_pb2.ConnectResponse:
# Retrieve Bumble `Connection` from request.
connection_handle = int.from_bytes(request.connection.cookie.value, 'big')
connection = self.device.lookup_connection(connection_handle)
if connection is None:
raise RuntimeError(f'{connection_handle}: not connection for handle')
channel: Union[l2cap.ClassicChannel, l2cap.LeCreditBasedChannel]
if request.type_variant() == 'basic':
assert request.basic
channel = await connection.create_l2cap_channel(
spec=l2cap.ClassicChannelSpec(
psm=request.basic.psm, mtu=request.basic.mtu
)
)
elif request.type_variant() == 'le_credit_based':
assert request.le_credit_based
channel = await connection.create_l2cap_channel(
spec=l2cap.LeCreditBasedChannelSpec(
psm=request.le_credit_based.spsm,
max_credits=request.le_credit_based.initial_credit,
mtu=request.le_credit_based.mtu,
mps=request.le_credit_based.mps,
)
)
else:
raise NotImplementedError(f"{request.type_variant()}: unsupported type")
index = ChannelIndex(channel.connection.handle, channel.source_cid)
self.register(index, ChannelProxy(channel))
return l2cap_pb2.ConnectResponse(channel=index.into_token())
@utils.rpc
async def WaitConnection(
self, request: l2cap_pb2.WaitConnectionRequest, context: grpc.ServicerContext
) -> l2cap_pb2.WaitConnectionResponse:
iter = self.listen()
fut: asyncio.Future[
Union[l2cap.ClassicChannel, l2cap.LeCreditBasedChannel]
] = asyncio.Future()
# Filter by connection.
if request.connection:
handle = int.from_bytes(request.connection.cookie.value, 'big')
iter = (it async for it in iter if it.connection.handle == handle)
if request.type_variant() == 'basic':
assert request.basic
basic = l2cap.PendingConnection.Basic(
fut.set_result,
request.basic.mtu or l2cap.L2CAP_MIN_BR_EDR_MTU,
)
async for i in (
it
async for it in iter
if isinstance(it, l2cap.IncomingConnection.Basic)
):
if not i.future.done() and i.psm == request.basic.psm:
i.future.set_result(basic)
break
elif request.type_variant() == 'le_credit_based':
assert request.le_credit_based
le_credit_based = l2cap.PendingConnection.LeCreditBased(
fut.set_result,
request.le_credit_based.mtu
or l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU,
request.le_credit_based.mps
or l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS,
request.le_credit_based.initial_credit
or l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_INITIAL_CREDITS,
)
async for j in (
it
async for it in iter
if isinstance(it, l2cap.IncomingConnection.LeCreditBased)
):
if not j.future.done() and j.psm == request.le_credit_based.spsm:
j.future.set_result(le_credit_based)
break
else:
raise NotImplementedError(f"{request.type_variant()}: unsupported type")
channel = await fut
index = ChannelIndex(channel.connection.handle, channel.source_cid)
self.register(index, ChannelProxy(channel))
return l2cap_pb2.WaitConnectionResponse(channel=index.into_token())
@utils.rpc
async def Disconnect(
self, request: l2cap_pb2.DisconnectRequest, context: grpc.ServicerContext
) -> l2cap_pb2.DisconnectResponse:
channel = self.channels[ChannelIndex.from_token(request.channel)]
await channel.disconnect()
return l2cap_pb2.DisconnectResponse(success=empty_pb2.Empty())
@utils.rpc
async def WaitDisconnection(
self, request: l2cap_pb2.WaitDisconnectionRequest, context: grpc.ServicerContext
) -> l2cap_pb2.WaitDisconnectionResponse:
channel = self.channels[ChannelIndex.from_token(request.channel)]
await channel.wait_disconnect()
return l2cap_pb2.WaitDisconnectionResponse(success=empty_pb2.Empty())
@utils.rpc
async def Receive(
self, request: l2cap_pb2.ReceiveRequest, context: grpc.ServicerContext
) -> AsyncGenerator[l2cap_pb2.ReceiveResponse, None]:
watcher = EventWatcher()
if request.source_variant() == 'channel':
assert request.channel
channel = self.channels[ChannelIndex.from_token(request.channel)]
rx = channel.rx
elif request.source_variant() == 'fixed_channel':
assert request.fixed_channel
rx = asyncio.Queue()
handle = request.fixed_channel.connection is not None and int.from_bytes(
request.fixed_channel.connection.cookie.value, 'big'
)
@watcher.on(self.device.host, 'l2cap_pdu')
def _(connection: device.Connection, cid: int, pdu: bytes) -> None:
assert request.fixed_channel
if cid == request.fixed_channel.cid and (
handle is None or handle == connection.handle
):
rx.put_nowait(pdu)
else:
raise NotImplementedError(f"{request.source_variant()}: unsupported type")
try:
while data := await rx.get():
yield l2cap_pb2.ReceiveResponse(data=data)
finally:
watcher.close()
@utils.rpc
async def Send(
self, request: l2cap_pb2.SendRequest, context: grpc.ServicerContext
) -> l2cap_pb2.SendResponse:
if request.sink_variant() == 'channel':
assert request.channel
channel = self.channels[ChannelIndex.from_token(request.channel)]
channel.send(request.data)
elif request.sink_variant() == 'fixed_channel':
assert request.fixed_channel
# Retrieve Bumble `Connection` from request.
connection_handle = int.from_bytes(
request.fixed_channel.connection.cookie.value, 'big'
)
connection = self.device.lookup_connection(connection_handle)
if connection is None:
raise RuntimeError(f'{connection_handle}: not connection for handle')
self.device.l2cap_channel_manager.send_pdu(
connection, request.fixed_channel.cid, request.data
)
else:
raise NotImplementedError(f"{request.sink_variant()}: unsupported type")
return l2cap_pb2.SendResponse(success=empty_pb2.Empty())
-147
View File
@@ -1,147 +0,0 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import enum
import struct
from typing import Optional
from bumble import gatt
from bumble import gatt_client
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
class SirkType(enum.IntEnum):
'''Coordinated Set Identification Service - 5.1 Set Identity Resolving Key.'''
ENCRYPTED = 0x00
PLAINTEXT = 0x01
class MemberLock(enum.IntEnum):
'''Coordinated Set Identification Service - 5.3 Set Member Lock.'''
UNLOCKED = 0x01
LOCKED = 0x02
# -----------------------------------------------------------------------------
# Utils
# -----------------------------------------------------------------------------
# TODO: Implement RSI Generator
# -----------------------------------------------------------------------------
# Server
# -----------------------------------------------------------------------------
class CoordinatedSetIdentificationService(gatt.TemplateService):
UUID = gatt.GATT_COORDINATED_SET_IDENTIFICATION_SERVICE
set_identity_resolving_key_characteristic: gatt.Characteristic
coordinated_set_size_characteristic: Optional[gatt.Characteristic] = None
set_member_lock_characteristic: Optional[gatt.Characteristic] = None
set_member_rank_characteristic: Optional[gatt.Characteristic] = None
def __init__(
self,
set_identity_resolving_key: bytes,
coordinated_set_size: Optional[int] = None,
set_member_lock: Optional[MemberLock] = None,
set_member_rank: Optional[int] = None,
) -> None:
characteristics = []
self.set_identity_resolving_key_characteristic = gatt.Characteristic(
uuid=gatt.GATT_SET_IDENTITY_RESOLVING_KEY_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.NOTIFY,
permissions=gatt.Characteristic.Permissions.READABLE,
# TODO: Implement encrypted SIRK reader.
value=struct.pack('B', SirkType.PLAINTEXT) + set_identity_resolving_key,
)
characteristics.append(self.set_identity_resolving_key_characteristic)
if coordinated_set_size is not None:
self.coordinated_set_size_characteristic = gatt.Characteristic(
uuid=gatt.GATT_COORDINATED_SET_SIZE_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.NOTIFY,
permissions=gatt.Characteristic.Permissions.READABLE,
value=struct.pack('B', coordinated_set_size),
)
characteristics.append(self.coordinated_set_size_characteristic)
if set_member_lock is not None:
self.set_member_lock_characteristic = gatt.Characteristic(
uuid=gatt.GATT_SET_MEMBER_LOCK_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.NOTIFY
| gatt.Characteristic.Properties.WRITE,
permissions=gatt.Characteristic.Permissions.READABLE
| gatt.Characteristic.Permissions.WRITEABLE,
value=struct.pack('B', set_member_lock),
)
characteristics.append(self.set_member_lock_characteristic)
if set_member_rank is not None:
self.set_member_rank_characteristic = gatt.Characteristic(
uuid=gatt.GATT_SET_MEMBER_RANK_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.NOTIFY,
permissions=gatt.Characteristic.Permissions.READABLE,
value=struct.pack('B', set_member_rank),
)
characteristics.append(self.set_member_rank_characteristic)
super().__init__(characteristics)
# -----------------------------------------------------------------------------
# Client
# -----------------------------------------------------------------------------
class CoordinatedSetIdentificationProxy(gatt_client.ProfileServiceProxy):
SERVICE_CLASS = CoordinatedSetIdentificationService
set_identity_resolving_key: gatt_client.CharacteristicProxy
coordinated_set_size: Optional[gatt_client.CharacteristicProxy] = None
set_member_lock: Optional[gatt_client.CharacteristicProxy] = None
set_member_rank: Optional[gatt_client.CharacteristicProxy] = None
def __init__(self, service_proxy: gatt_client.ServiceProxy) -> None:
self.service_proxy = service_proxy
self.set_identity_resolving_key = service_proxy.get_characteristics_by_uuid(
gatt.GATT_SET_IDENTITY_RESOLVING_KEY_CHARACTERISTIC
)[0]
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_COORDINATED_SET_SIZE_CHARACTERISTIC
):
self.coordinated_set_size = characteristics[0]
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_SET_MEMBER_LOCK_CHARACTERISTIC
):
self.set_member_lock = characteristics[0]
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_SET_MEMBER_RANK_CHARACTERISTIC
):
self.set_member_rank = characteristics[0]
+3 -2
View File
@@ -889,7 +889,8 @@ class Client:
multiplexer: Optional[Multiplexer]
l2cap_channel: Optional[l2cap.ClassicChannel]
def __init__(self, connection: Connection) -> None:
def __init__(self, device: Device, connection: Connection) -> None:
self.device = device
self.connection = connection
self.l2cap_channel = None
self.multiplexer = None
@@ -905,7 +906,7 @@ class Client:
raise
assert self.l2cap_channel is not None
# Create a multiplexer to manage DLCs with the server
# Create a mutliplexer to manage DLCs with the server
self.multiplexer = Multiplexer(self.l2cap_channel, Multiplexer.Role.INITIATOR)
# Connect the multiplexer
+4 -4
View File
@@ -760,13 +760,13 @@ class SDP_ServiceSearchAttributeResponse(SDP_PDU):
class Client:
channel: Optional[l2cap.ClassicChannel]
def __init__(self, connection: Connection) -> None:
self.connection = connection
def __init__(self, device: Device) -> None:
self.device = device
self.pending_request = None
self.channel = None
async def connect(self) -> None:
self.channel = await self.connection.create_l2cap_channel(
async def connect(self, connection: Connection) -> None:
self.channel = await connection.create_l2cap_channel(
spec=l2cap.ClassicChannelSpec(SDP_PSM)
)
+68 -214
View File
@@ -27,7 +27,6 @@ import logging
import asyncio
import enum
import secrets
from dataclasses import dataclass
from typing import (
TYPE_CHECKING,
Any,
@@ -54,7 +53,6 @@ from .core import (
BT_BR_EDR_TRANSPORT,
BT_CENTRAL_ROLE,
BT_LE_TRANSPORT,
AdvertisingData,
ProtocolError,
name_or_number,
)
@@ -187,8 +185,8 @@ SMP_KEYPRESS_AUTHREQ = 0b00010000
SMP_CT2_AUTHREQ = 0b00100000
# Crypto salt
SMP_CTKD_H7_LEBR_SALT = bytes.fromhex('000000000000000000000000746D7031')
SMP_CTKD_H7_BRLE_SALT = bytes.fromhex('000000000000000000000000746D7032')
SMP_CTKD_H7_LEBR_SALT = bytes.fromhex('00000000000000000000000000000000746D7031')
SMP_CTKD_H7_BRLE_SALT = bytes.fromhex('00000000000000000000000000000000746D7032')
# fmt: on
# pylint: enable=line-too-long
@@ -565,54 +563,6 @@ class PairingMethod(enum.IntEnum):
CTKD_OVER_CLASSIC = 4
# -----------------------------------------------------------------------------
class OobContext:
"""Cryptographic context for LE SC OOB pairing."""
ecc_key: crypto.EccKey
r: bytes
def __init__(
self, ecc_key: Optional[crypto.EccKey] = None, r: Optional[bytes] = None
) -> None:
self.ecc_key = crypto.EccKey.generate() if ecc_key is None else ecc_key
self.r = crypto.r() if r is None else r
def share(self) -> OobSharedData:
pkx = self.ecc_key.x[::-1]
return OobSharedData(c=crypto.f4(pkx, pkx, self.r, bytes(1)), r=self.r)
# -----------------------------------------------------------------------------
class OobLegacyContext:
"""Cryptographic context for LE Legacy OOB pairing."""
tk: bytes
def __init__(self, tk: Optional[bytes] = None) -> None:
self.tk = crypto.r() if tk is None else tk
# -----------------------------------------------------------------------------
@dataclass
class OobSharedData:
"""Shareable data for LE SC OOB pairing."""
c: bytes
r: bytes
def to_ad(self) -> AdvertisingData:
return AdvertisingData(
[
(AdvertisingData.LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE, self.c),
(AdvertisingData.LE_SECURE_CONNECTIONS_RANDOM_VALUE, self.r),
]
)
def __str__(self) -> str:
return f'OOB(C={self.c.hex()}, R={self.r.hex()})'
# -----------------------------------------------------------------------------
class Session:
# I/O Capability to pairing method decision matrix
@@ -677,13 +627,6 @@ class Session:
},
}
ea: bytes
eb: bytes
ltk: bytes
preq: bytes
pres: bytes
tk: bytes
def __init__(
self,
manager: Manager,
@@ -693,10 +636,17 @@ class Session:
) -> None:
self.manager = manager
self.connection = connection
self.preq: Optional[bytes] = None
self.pres: Optional[bytes] = None
self.ea = None
self.eb = None
self.tk = bytes(16)
self.r = bytes(16)
self.stk = None
self.ltk = None
self.ltk_ediv = 0
self.ltk_rand = bytes(8)
self.link_key: Optional[bytes] = None
self.link_key = None
self.initiator_key_distribution: int = 0
self.responder_key_distribution: int = 0
self.peer_random_value: Optional[bytes] = None
@@ -709,7 +659,7 @@ class Session:
self.peer_bd_addr: Optional[Address] = None
self.peer_signature_key = None
self.peer_expected_distributions: List[Type[SMP_Command]] = []
self.dh_key = b''
self.dh_key = None
self.confirm_value = None
self.passkey: Optional[int] = None
self.passkey_ready = asyncio.Event()
@@ -762,8 +712,8 @@ class Session:
self.io_capability = pairing_config.delegate.io_capability
self.peer_io_capability = SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
# OOB
self.oob_data_flag = 0 if pairing_config.oob is None else 1
# OOB (not supported yet)
self.oob = False
# Set up addresses
self_address = connection.self_address
@@ -779,35 +729,9 @@ class Session:
self.ia = bytes(peer_address)
self.iat = 1 if peer_address.is_random else 0
# Select the ECC key, TK and r initial value
if pairing_config.oob:
self.peer_oob_data = pairing_config.oob.peer_data
if pairing_config.sc:
if pairing_config.oob.our_context is None:
raise ValueError(
"oob pairing config requires a context when sc is True"
)
self.r = pairing_config.oob.our_context.r
self.ecc_key = pairing_config.oob.our_context.ecc_key
if pairing_config.oob.legacy_context is not None:
self.tk = pairing_config.oob.legacy_context.tk
else:
if pairing_config.oob.legacy_context is None:
raise ValueError(
"oob pairing config requires a legacy context when sc is False"
)
self.r = bytes(16)
self.ecc_key = manager.ecc_key
self.tk = pairing_config.oob.legacy_context.tk
else:
self.peer_oob_data = None
self.r = bytes(16)
self.ecc_key = manager.ecc_key
self.tk = bytes(16)
@property
def pkx(self) -> Tuple[bytes, bytes]:
return (self.ecc_key.x[::-1], self.peer_public_key_x)
return (bytes(reversed(self.manager.ecc_key.x)), self.peer_public_key_x)
@property
def pka(self) -> bytes:
@@ -844,10 +768,7 @@ class Session:
return None
def decide_pairing_method(
self,
auth_req: int,
initiator_io_capability: int,
responder_io_capability: int,
self, auth_req: int, initiator_io_capability: int, responder_io_capability: int
) -> None:
if self.connection.transport == BT_BR_EDR_TRANSPORT:
self.pairing_method = PairingMethod.CTKD_OVER_CLASSIC
@@ -988,7 +909,7 @@ class Session:
command = SMP_Pairing_Request_Command(
io_capability=self.io_capability,
oob_data_flag=self.oob_data_flag,
oob_data_flag=0,
auth_req=self.auth_req,
maximum_encryption_key_size=16,
initiator_key_distribution=self.initiator_key_distribution,
@@ -1000,7 +921,7 @@ class Session:
def send_pairing_response_command(self) -> None:
response = SMP_Pairing_Response_Command(
io_capability=self.io_capability,
oob_data_flag=self.oob_data_flag,
oob_data_flag=0,
auth_req=self.auth_req,
maximum_encryption_key_size=16,
initiator_key_distribution=self.initiator_key_distribution,
@@ -1061,8 +982,8 @@ class Session:
def send_public_key_command(self) -> None:
self.send_command(
SMP_Pairing_Public_Key_Command(
public_key_x=self.ecc_key.x[::-1],
public_key_y=self.ecc_key.y[::-1],
public_key_x=bytes(reversed(self.manager.ecc_key.x)),
public_key_y=bytes(reversed(self.manager.ecc_key.y)),
)
)
@@ -1098,54 +1019,18 @@ class Session:
)
)
@classmethod
def derive_ltk(cls, link_key: bytes, ct2: bool) -> bytes:
'''Derives Long Term Key from Link Key.
Args:
link_key: BR/EDR Link Key bytes in little-endian.
ct2: whether ct2 is supported on both devices.
Returns:
LE Long Tern Key bytes in little-endian.
'''
async def derive_ltk(self) -> None:
link_key = await self.manager.device.get_link_key(self.connection.peer_address)
assert link_key is not None
ilk = (
crypto.h7(salt=SMP_CTKD_H7_BRLE_SALT, w=link_key)
if ct2
if self.ct2
else crypto.h6(link_key, b'tmp2')
)
return crypto.h6(ilk, b'brle')
@classmethod
def derive_link_key(cls, ltk: bytes, ct2: bool) -> bytes:
'''Derives Link Key from Long Term Key.
Args:
ltk: LE Long Term Key bytes in little-endian.
ct2: whether ct2 is supported on both devices.
Returns:
BR/EDR Link Key bytes in little-endian.
'''
ilk = (
crypto.h7(salt=SMP_CTKD_H7_LEBR_SALT, w=ltk)
if ct2
else crypto.h6(ltk, b'tmp1')
)
return crypto.h6(ilk, b'lebr')
async def get_link_key_and_derive_ltk(self) -> None:
'''Retrieves BR/EDR Link Key from storage and derive it to LE LTK.'''
link_key = await self.manager.device.get_link_key(self.connection.peer_address)
if link_key is None:
logging.warning(
'Try to derive LTK but host does not have the LK. Send a SMP_PAIRING_FAILED but the procedure will not be paused!'
)
self.send_pairing_failed(
SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR
)
else:
self.ltk = self.derive_ltk(link_key, self.ct2)
self.ltk = crypto.h6(ilk, b'brle')
def distribute_keys(self) -> None:
# Distribute the keys as required
if self.is_initiator:
# CTKD: Derive LTK from LinkKey
@@ -1154,7 +1039,7 @@ class Session:
and self.initiator_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG
):
self.ctkd_task = self.connection.abort_on(
'disconnection', self.get_link_key_and_derive_ltk()
'disconnection', self.derive_ltk()
)
elif not self.sc:
# Distribute the LTK, EDIV and RAND
@@ -1184,7 +1069,12 @@ class Session:
# CTKD, calculate BR/EDR link key
if self.initiator_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG:
self.link_key = self.derive_link_key(self.ltk, self.ct2)
ilk = (
crypto.h7(salt=SMP_CTKD_H7_LEBR_SALT, w=self.ltk)
if self.ct2
else crypto.h6(self.ltk, b'tmp1')
)
self.link_key = crypto.h6(ilk, b'lebr')
else:
# CTKD: Derive LTK from LinkKey
@@ -1193,7 +1083,7 @@ class Session:
and self.responder_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG
):
self.ctkd_task = self.connection.abort_on(
'disconnection', self.get_link_key_and_derive_ltk()
'disconnection', self.derive_ltk()
)
# Distribute the LTK, EDIV and RAND
elif not self.sc:
@@ -1223,7 +1113,12 @@ class Session:
# CTKD, calculate BR/EDR link key
if self.responder_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG:
self.link_key = self.derive_link_key(self.ltk, self.ct2)
ilk = (
crypto.h7(salt=SMP_CTKD_H7_LEBR_SALT, w=self.ltk)
if self.ct2
else crypto.h6(self.ltk, b'tmp1')
)
self.link_key = crypto.h6(ilk, b'lebr')
def compute_peer_expected_distributions(self, key_distribution_flags: int) -> None:
# Set our expectations for what to wait for in the key distribution phase
@@ -1401,7 +1296,7 @@ class Session:
try:
handler(command)
except Exception as error:
logger.exception(f'{color("!!! Exception in handler:", "red")} {error}')
logger.warning(f'{color("!!! Exception in handler:", "red")} {error}')
response = SMP_Pairing_Failed_Command(
reason=SMP_UNSPECIFIED_REASON_ERROR
)
@@ -1438,28 +1333,15 @@ class Session:
self.sc = self.sc and (command.auth_req & SMP_SC_AUTHREQ != 0)
self.ct2 = self.ct2 and (command.auth_req & SMP_CT2_AUTHREQ != 0)
# Infer the pairing method
if (self.sc and (self.oob_data_flag != 0 or command.oob_data_flag != 0)) or (
not self.sc and (self.oob_data_flag != 0 and command.oob_data_flag != 0)
):
# Use OOB
self.pairing_method = PairingMethod.OOB
if not self.sc and self.tk is None:
# For legacy OOB, TK is required.
logger.warning("legacy OOB without TK")
self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR)
return
if command.oob_data_flag == 0:
# The peer doesn't have OOB data, use r=0
self.r = bytes(16)
else:
# Decide which pairing method to use from the IO capability
self.decide_pairing_method(
command.auth_req,
command.io_capability,
self.io_capability,
)
# Check for OOB
if command.oob_data_flag != 0:
self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR)
return
# Decide which pairing method to use
self.decide_pairing_method(
command.auth_req, command.io_capability, self.io_capability
)
logger.debug(f'pairing method: {self.pairing_method.name}')
# Key distribution
@@ -1508,26 +1390,15 @@ class Session:
self.bonding = self.bonding and (command.auth_req & SMP_BONDING_AUTHREQ != 0)
self.sc = self.sc and (command.auth_req & SMP_SC_AUTHREQ != 0)
# Infer the pairing method
if (self.sc and (self.oob_data_flag != 0 or command.oob_data_flag != 0)) or (
not self.sc and (self.oob_data_flag != 0 and command.oob_data_flag != 0)
):
# Use OOB
self.pairing_method = PairingMethod.OOB
if not self.sc and self.tk is None:
# For legacy OOB, TK is required.
logger.warning("legacy OOB without TK")
self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR)
return
if command.oob_data_flag == 0:
# The peer doesn't have OOB data, use r=0
self.r = bytes(16)
else:
# Decide which pairing method to use from the IO capability
self.decide_pairing_method(
command.auth_req, self.io_capability, command.io_capability
)
# Check for OOB
if self.sc and command.oob_data_flag:
self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR)
return
# Decide which pairing method to use
self.decide_pairing_method(
command.auth_req, self.io_capability, command.io_capability
)
logger.debug(f'pairing method: {self.pairing_method.name}')
# Key distribution
@@ -1678,13 +1549,12 @@ class Session:
if self.passkey_step < 20:
self.send_pairing_confirm_command()
return
elif self.pairing_method != PairingMethod.OOB:
else:
return
else:
if self.pairing_method in (
PairingMethod.JUST_WORKS,
PairingMethod.NUMERIC_COMPARISON,
PairingMethod.OOB,
):
self.send_pairing_random_command()
elif self.pairing_method == PairingMethod.PASSKEY:
@@ -1721,7 +1591,6 @@ class Session:
if self.pairing_method in (
PairingMethod.JUST_WORKS,
PairingMethod.NUMERIC_COMPARISON,
PairingMethod.OOB,
):
ra = bytes(16)
rb = ra
@@ -1730,6 +1599,7 @@ class Session:
ra = self.passkey.to_bytes(16, byteorder='little')
rb = ra
else:
# OOB not implemented yet
return
assert self.preq and self.pres
@@ -1781,33 +1651,18 @@ class Session:
self.peer_public_key_y = command.public_key_y
# Compute the DH key
self.dh_key = self.ecc_key.dh(
command.public_key_x[::-1],
command.public_key_y[::-1],
)[::-1]
self.dh_key = bytes(
reversed(
self.manager.ecc_key.dh(
bytes(reversed(command.public_key_x)),
bytes(reversed(command.public_key_y)),
)
)
)
logger.debug(f'DH key: {self.dh_key.hex()}')
if self.pairing_method == PairingMethod.OOB:
# Check against shared OOB data
if self.peer_oob_data:
confirm_verifier = crypto.f4(
self.peer_public_key_x,
self.peer_public_key_x,
self.peer_oob_data.r,
bytes(1),
)
if not self.check_expected_value(
self.peer_oob_data.c,
confirm_verifier,
SMP_CONFIRM_VALUE_FAILED_ERROR,
):
return
if self.is_initiator:
if self.pairing_method == PairingMethod.OOB:
self.send_pairing_random_command()
else:
self.send_pairing_confirm_command()
self.send_pairing_confirm_command()
else:
if self.pairing_method == PairingMethod.PASSKEY:
self.display_or_input_passkey()
@@ -1818,7 +1673,6 @@ class Session:
if self.pairing_method in (
PairingMethod.JUST_WORKS,
PairingMethod.NUMERIC_COMPARISON,
PairingMethod.OOB,
):
# We can now send the confirmation value
self.send_pairing_confirm_command()
@@ -1847,6 +1701,7 @@ class Session:
else:
self.send_pairing_dhkey_check_command()
else:
assert self.ltk
self.start_encryption(self.ltk)
def on_smp_pairing_failed_command(
@@ -1896,7 +1751,6 @@ class Manager(EventEmitter):
sessions: Dict[int, Session]
pairing_config_factory: Callable[[Connection], PairingConfig]
session_proxy: Type[Session]
_ecc_key: Optional[crypto.EccKey]
def __init__(
self,
+1 -1
View File
@@ -150,7 +150,7 @@ class PacketParser:
try:
self.sink.on_packet(bytes(self.packet))
except Exception as error:
logger.exception(
logger.warning(
color(f'!!! Exception in on_packet: {error}', 'red')
)
self.reset()
+61 -58
View File
@@ -24,10 +24,9 @@ import platform
import usb1
from bumble.transport.common import Transport, ParserSource
from bumble import hci
from bumble.colors import color
from bumble.utils import AsyncRunner
from .common import Transport, ParserSource
from .. import hci
from ..colors import color
# -----------------------------------------------------------------------------
@@ -114,7 +113,7 @@ async def open_usb_transport(spec: str) -> Transport:
def __init__(self, device, acl_out):
self.device = device
self.acl_out = acl_out
self.acl_out_transfer = device.getTransfer()
self.transfer = device.getTransfer()
self.packets = collections.deque() # Queue of packets waiting to be sent
self.loop = asyncio.get_running_loop()
self.cancel_done = self.loop.create_future()
@@ -138,20 +137,21 @@ async def open_usb_transport(spec: str) -> Transport:
# The queue was previously empty, re-prime the pump
self.process_queue()
def transfer_callback(self, transfer):
def on_packet_sent(self, transfer):
status = transfer.getStatus()
# logger.debug(f'<<< USB out transfer callback: status={status}')
# pylint: disable=no-member
if status == usb1.TRANSFER_COMPLETED:
self.loop.call_soon_threadsafe(self.on_packet_sent)
self.loop.call_soon_threadsafe(self.on_packet_sent_)
elif status == usb1.TRANSFER_CANCELLED:
self.loop.call_soon_threadsafe(self.cancel_done.set_result, None)
else:
logger.warning(
color(f'!!! OUT transfer not completed: status={status}', 'red')
color(f'!!! out transfer not completed: status={status}', 'red')
)
def on_packet_sent(self):
def on_packet_sent_(self):
if self.packets:
self.packets.popleft()
self.process_queue()
@@ -163,20 +163,22 @@ async def open_usb_transport(spec: str) -> Transport:
packet = self.packets[0]
packet_type = packet[0]
if packet_type == hci.HCI_ACL_DATA_PACKET:
self.acl_out_transfer.setBulk(
self.acl_out, packet[1:], callback=self.transfer_callback
self.transfer.setBulk(
self.acl_out, packet[1:], callback=self.on_packet_sent
)
self.acl_out_transfer.submit()
logger.debug('submit ACL')
self.transfer.submit()
elif packet_type == hci.HCI_COMMAND_PACKET:
self.acl_out_transfer.setControl(
self.transfer.setControl(
USB_RECIPIENT_DEVICE | USB_REQUEST_TYPE_CLASS,
0,
0,
0,
packet[1:],
callback=self.transfer_callback,
callback=self.on_packet_sent,
)
self.acl_out_transfer.submit()
logger.debug('submit COMMAND')
self.transfer.submit()
else:
logger.warning(color(f'unsupported packet type {packet_type}', 'red'))
@@ -191,11 +193,11 @@ async def open_usb_transport(spec: str) -> Transport:
self.packets.clear()
# If we have a transfer in flight, cancel it
if self.acl_out_transfer.isSubmitted():
if self.transfer.isSubmitted():
# Try to cancel the transfer, but that may fail because it may have
# already completed
try:
self.acl_out_transfer.cancel()
self.transfer.cancel()
logger.debug('waiting for OUT transfer cancellation to be done...')
await self.cancel_done
@@ -204,22 +206,27 @@ async def open_usb_transport(spec: str) -> Transport:
logger.debug('OUT transfer likely already completed')
class UsbPacketSource(asyncio.Protocol, ParserSource):
def __init__(self, device, metadata, acl_in, events_in):
def __init__(self, context, device, metadata, acl_in, events_in):
super().__init__()
self.context = context
self.device = device
self.metadata = metadata
self.acl_in = acl_in
self.acl_in_transfer = None
self.events_in = events_in
self.events_in_transfer = None
self.loop = asyncio.get_running_loop()
self.queue = asyncio.Queue()
self.dequeue_task = None
self.closed = False
self.event_loop_done = self.loop.create_future()
self.cancel_done = {
hci.HCI_EVENT_PACKET: self.loop.create_future(),
hci.HCI_ACL_DATA_PACKET: self.loop.create_future(),
}
self.closed = False
self.events_in_transfer = None
self.acl_in_transfer = None
# Create a thread to process events
self.event_thread = threading.Thread(target=self.run)
def start(self):
# Set up transfer objects for input
@@ -227,7 +234,7 @@ async def open_usb_transport(spec: str) -> Transport:
self.events_in_transfer.setInterrupt(
self.events_in,
READ_SIZE,
callback=self.transfer_callback,
callback=self.on_packet_received,
user_data=hci.HCI_EVENT_PACKET,
)
self.events_in_transfer.submit()
@@ -236,23 +243,22 @@ async def open_usb_transport(spec: str) -> Transport:
self.acl_in_transfer.setBulk(
self.acl_in,
READ_SIZE,
callback=self.transfer_callback,
callback=self.on_packet_received,
user_data=hci.HCI_ACL_DATA_PACKET,
)
self.acl_in_transfer.submit()
self.dequeue_task = self.loop.create_task(self.dequeue())
self.event_thread.start()
@property
def usb_transfer_submitted(self):
return (
self.events_in_transfer.isSubmitted()
or self.acl_in_transfer.isSubmitted()
)
def transfer_callback(self, transfer):
def on_packet_received(self, transfer):
packet_type = transfer.getUserData()
status = transfer.getStatus()
# logger.debug(
# f'<<< USB IN transfer callback: status={status} '
# f'packet_type={packet_type} '
# f'length={transfer.getActualLength()}'
# )
# pylint: disable=no-member
if status == usb1.TRANSFER_COMPLETED:
@@ -261,18 +267,18 @@ async def open_usb_transport(spec: str) -> Transport:
+ transfer.getBuffer()[: transfer.getActualLength()]
)
self.loop.call_soon_threadsafe(self.queue.put_nowait, packet)
# Re-submit the transfer so we can receive more data
transfer.submit()
elif status == usb1.TRANSFER_CANCELLED:
self.loop.call_soon_threadsafe(
self.cancel_done[packet_type].set_result, None
)
return
else:
logger.warning(
color(f'!!! IN transfer not completed: status={status}', 'red')
color(f'!!! transfer not completed: status={status}', 'red')
)
self.loop.call_soon_threadsafe(self.on_transport_lost)
# Re-submit the transfer so we can receive more data
transfer.submit()
async def dequeue(self):
while not self.closed:
@@ -282,6 +288,21 @@ async def open_usb_transport(spec: str) -> Transport:
return
self.parser.feed_data(packet)
def run(self):
logger.debug('starting USB event loop')
while (
self.events_in_transfer.isSubmitted()
or self.acl_in_transfer.isSubmitted()
):
# pylint: disable=no-member
try:
self.context.handleEvents()
except usb1.USBErrorInterrupted:
pass
logger.debug('USB event loop done')
self.loop.call_soon_threadsafe(self.event_loop_done.set_result, None)
def close(self):
self.closed = True
@@ -310,14 +331,15 @@ async def open_usb_transport(spec: str) -> Transport:
f'IN[{packet_type}] transfer likely already completed'
)
# Wait for the thread to terminate
await self.event_loop_done
class UsbTransport(Transport):
def __init__(self, context, device, interface, setting, source, sink):
super().__init__(source, sink)
self.context = context
self.device = device
self.interface = interface
self.loop = asyncio.get_running_loop()
self.event_loop_done = self.loop.create_future()
# Get exclusive access
device.claimInterface(interface)
@@ -330,22 +352,6 @@ async def open_usb_transport(spec: str) -> Transport:
source.start()
sink.start()
# Create a thread to process events
self.event_thread = threading.Thread(target=self.run)
self.event_thread.start()
def run(self):
logger.debug('starting USB event loop')
while self.source.usb_transfer_submitted:
# pylint: disable=no-member
try:
self.context.handleEvents()
except usb1.USBErrorInterrupted:
pass
logger.debug('USB event loop done')
self.loop.call_soon_threadsafe(self.event_loop_done.set_result, None)
async def close(self):
self.source.close()
self.sink.close()
@@ -355,9 +361,6 @@ async def open_usb_transport(spec: str) -> Transport:
self.device.close()
self.context.close()
# Wait for the thread to terminate
await self.event_loop_done
# Find the device according to the spec moniker
load_libusb()
context = usb1.USBContext()
@@ -537,7 +540,7 @@ async def open_usb_transport(spec: str) -> Transport:
except usb1.USBError:
logger.warning('failed to set configuration')
source = UsbPacketSource(device, device_metadata, acl_in, events_in)
source = UsbPacketSource(context, device, device_metadata, acl_in, events_in)
sink = UsbPacketSink(device, acl_out)
return UsbTransport(context, device, interface, setting, source, sink)
except usb1.USBError as error:
+1 -17
View File
@@ -432,7 +432,7 @@ def wrap_async(function):
def deprecated(msg: str):
"""
Throw deprecation warning before execution.
Throw deprecation warning before execution
"""
def wrapper(function):
@@ -444,19 +444,3 @@ def deprecated(msg: str):
return inner
return wrapper
def experimental(msg: str):
"""
Throws a future warning before execution.
"""
def wrapper(function):
@wraps(function)
def inner(*args, **kwargs):
warnings.warn(msg, FutureWarning)
return function(*args, **kwargs)
return inner
return wrapper
-1
View File
@@ -70,7 +70,6 @@ nav:
- Extras:
- extras/index.md
- Android Remote HCI: extras/android_remote_hci.md
- Android BT Bench: extras/android_bt_bench.md
- Hive:
- hive/index.md
- Speaker: hive/web/speaker/speaker.html
@@ -1,64 +0,0 @@
ANDROID BENCH APP
=================
This Android app that is compatible with the Bumble `bench` command line app.
This app can be used to test the throughput and latency between two Android
devices, or between an Android device and another device running the Bumble
`bench` app.
Only the RFComm Client, RFComm Server, L2CAP Client and L2CAP Server modes are
supported.
Building
--------
You can build the app by running `./gradlew build` (use `gradlew.bat` on Windows) from the `BtBench` top level directory.
You can also build with Android Studio: open the `BtBench` project. You can build and/or debug from there.
If the build succeeds, you can find the app APKs (debug and release) at:
* [Release] ``app/build/outputs/apk/release/app-release-unsigned.apk``
* [Debug] ``app/build/outputs/apk/debug/app-debug.apk``
Running
-------
### Starting the app
You can start the app from the Android launcher, from Android Studio, or with `adb`
#### Launching from the launcher
Just tap the app icon on the launcher, check the parameters, and tap
one of the benchmark action buttons.
#### Launching with `adb`
Using the `am` command, you can start the activity, and pass it arguments so that you can
automatically start the benchmark test, and/or set the parameters.
| Parameter Name | Parameter Type | Description
|------------------------|----------------|------------
| autostart | String | Benchmark to start. (rfcomm-client, rfcomm-server, l2cap-client or l2cap-server)
| packet-count | Integer | Number of packets to send (rfcomm-client and l2cap-client only)
| packet-size | Integer | Number of bytes per packet (rfcomm-client and l2cap-client only)
| peer-bluetooth-address | Integer | Peer Bluetooth address to connect to (rfcomm-client and l2cap-client | only)
!!! tip "Launching from adb with auto-start"
In this example, we auto-start the Rfcomm Server bench action.
```bash
$ adb shell am start -n com.github.google.bumble.btbench/.MainActivity --es autostart rfcomm-server
```
!!! tip "Launching from adb with auto-start and some parameters"
In this example, we auto-start the Rfcomm Client bench action, set the packet count to 100,
and the packet size to 1024, and connect to DA:4C:10:DE:17:02
```bash
$ adb shell am start -n com.github.google.bumble.btbench/.MainActivity --es autostart rfcomm-client --ei packet-count 100 --ei packet-size 1024 --es peer-bluetooth-address DA:4C:10:DE:17:02
```
#### Selecting a Peer Bluetooth Address
The app's main activity has a "Peer Bluetooth Address" setting where you can change the address.
!!! note "Bluetooth Address for L2CAP vs RFComm"
For BLE (L2CAP mode), the address of a device typically changes regularly (it is randomized for privacy), whereas the Bluetooth Classic addresses will remain the same (RFComm mode).
If two devices are paired and bonded, then they will each "see" a non-changing address for each other even with BLE (Resolvable Private Address)
+13 -53
View File
@@ -1,19 +1,19 @@
ANDROID REMOTE HCI APP
======================
This application allows using an android phone's built-in Bluetooth controller with
This application allows using an android phone's built-in Bluetooth controller with
a Bumble host stack running outside the phone (typically a development laptop or desktop).
The app runs an HCI proxy between a TCP socket on the "outside" and the Bluetooth HCI HAL
on the "inside". (See [this page](https://source.android.com/docs/core/connect/bluetooth) for a high level
on the "inside". (See [this page](https://source.android.com/docs/core/connect/bluetooth) for a high level
description of the Android Bluetooth HCI HAL).
The HCI packets received on the TCP socket are forwarded to the phone's controller, and the
The HCI packets received on the TCP socket are forwarded to the phone's controller, and the
packets coming from the controller are forwarded to the TCP socket.
Building
--------
You can build the app by running `./gradlew build` (use `gradlew.bat` on Windows) from the `extras/android/RemoteHCI` top level directory.
You can build the app by running `./gradlew build` (use `gradlew.bat` on Windows) from the `RemoteHCI` top level directory.
You can also build with Android Studio: open the `RemoteHCI` project. You can build and/or debug from there.
If the build succeeds, you can find the app APKs (debug and release) at:
@@ -25,23 +25,9 @@ If the build succeeds, you can find the app APKs (debug and release) at:
Running
-------
!!! note
In the following examples, it is assumed that shell commands are executed while in the
app's root directory, `extras/android/RemoteHCI`. If you are in a different directory,
adjust the relative paths accordingly.
### Preconditions
When the proxy starts (tapping the "Start" button in the app's main activity, or running the proxy
from an `adb shell` command line), it will try to bind to the Bluetooth HAL.
This requires that there is no other HAL client, and requires certain privileges.
For running as a regular app, this requires disabling SELinux temporarily.
For running as a command-line executable, this just requires a root shell.
#### Root Shell
!!! tip "Restart `adb` as root"
```bash
$ adb root
```
When the proxy starts (tapping the "Start" button in the app's main activity), it will try to
bind to the Bluetooth HAL. This requires disabling SELinux temporarily, and being the only HAL client.
#### Disabling SELinux
Binding to the Bluetooth HCI HAL requires certain SELinux permissions that can't simply be changed
@@ -70,8 +56,8 @@ development phone).
This state will also reset to the normal SELinux enforcement when you reboot.
#### Stopping the bluetooth process
Since the Bluetooth HAL service can only accept one client, and that in normal conditions
that client is the Android's bluetooth stack, it is required to first shut down the
Since the Bluetooth HAL service can only accept one client, and that in normal conditions
that client is the Android's bluetooth stack, it is required to first shut down the
Android bluetooth stack process.
!!! tip "Checking if the Bluetooth process is running"
@@ -93,33 +79,7 @@ Airplane Mode, then rebooting. The bluetooth process should, in theory, not rest
$ adb shell cmd bluetooth_manager disable
```
### Running as a command line app
You push the built APK to a temporary location on the phone's filesystem, then launch the command
line executable with an `adb shell` command.
!!! tip "Pushing the executable"
```bash
$ adb push app/build/outputs/apk/release/app-release-unsigned.apk /data/local/tmp/remotehci.apk
```
Do this every time you rebuild. Alternatively, you can push the `debug` APK instead:
```bash
$ adb push app/build/outputs/apk/debug/app-debug.apk /data/local/tmp/remotehci.apk
```
!!! tip "Start the proxy from the command line"
```bash
adb shell "CLASSPATH=/data/local/tmp/remotehci.apk app_process /system/bin com.github.google.bumble.remotehci.CommandLineInterface"
```
This will run the proxy, listening on the default TCP port.
If you want a different port, pass it as a command line parameter
!!! tip "Start the proxy from the command line with a specific TCP port"
```bash
adb shell "CLASSPATH=/data/local/tmp/remotehci.apk app_process /system/bin com.github.google.bumble.remotehci.CommandLineInterface 12345"
```
### Running as a normal app
### Starting the app
You can start the app from the Android launcher, from Android Studio, or with `adb`
#### Launching from the launcher
@@ -143,11 +103,11 @@ automatically start the proxy, and/or set the port number.
#### Selecting a TCP port
The RemoteHCI app's main activity has a "TCP Port" setting where you can change the port on
which the proxy is accepting connections. If the default value isn't suitable, you can
which the proxy is accepting connections. If the default value isn't suitable, you can
change it there (you can also use the special value 0 to let the OS assign a port number for you).
### Connecting to the proxy
To connect the Bumble stack to the proxy, you need to be able to reach the phone's network
To connect the Bumble stack to the proxy, you need to be able to reach the phone's network
stack. This can be done over the phone's WiFi connection, or, alternatively, using an `adb`
TCP forward (which should be faster than over WiFi).
@@ -156,7 +116,7 @@ TCP forward (which should be faster than over WiFi).
```bash
$ adb forward tcp:<outside-port> tcp:<inside-port>
```
Where ``<outside-port>`` is the port number for a listening socket on your laptop or
Where ``<outside-port>`` is the port number for a listening socket on your laptop or
desktop machine, and <inside-port> is the TCP port selected in the app's user interface.
Those two ports may be the same, of course.
For example, with the default TCP port 9993:
@@ -165,7 +125,7 @@ TCP forward (which should be faster than over WiFi).
```
Once you've ensured that you can reach the proxy's TCP port on the phone, either directly or
via an `adb` forward, you can then use it as a Bumble transport, using the transport name:
via an `adb` forward, you can then use it as a Bumble transport, using the transport name:
``tcp-client:<host>:<port>`` syntax.
!!! example "Connecting a Bumble client"
+1 -9
View File
@@ -8,12 +8,4 @@ Android Remote HCI
Allows using an Android phone's built-in Bluetooth controller with a Bumble
stack running on a development machine.
See [Android Remote HCI](android_remote_hci.md) for details.
Android BT Bench
----------------
An Android app that is compatible with the Bumble `bench` command line app.
This app can be used to test the throughput and latency between two Android
devices, or between an Android device and another device running the Bumble
`bench` app.
See [Android Remote HCI](android_remote_hci.md) for details.
-5
View File
@@ -1,5 +0,0 @@
{
"name": "Bumble-LEA",
"keystore": "JsonKeyStore",
"advertising_interval": 100
}
+4 -4
View File
@@ -53,10 +53,10 @@ def sdp_records():
# -----------------------------------------------------------------------------
# pylint: disable-next=too-many-nested-blocks
async def find_a2dp_service(connection):
async def find_a2dp_service(device, connection):
# Connect to the SDP Server
sdp_client = SDP_Client(connection)
await sdp_client.connect()
sdp_client = SDP_Client(device)
await sdp_client.connect(connection)
# Search for services with an Audio Sink service class
search_result = await sdp_client.search_attributes(
@@ -177,7 +177,7 @@ async def main():
print('*** Encryption on')
# Look for an A2DP service
avdtp_version = await find_a2dp_service(connection)
avdtp_version = await find_a2dp_service(device, connection)
if not avdtp_version:
print(color('!!! no AVDTP service found'))
return
+3 -1
View File
@@ -165,7 +165,9 @@ async def main():
print('*** Encryption on')
# Look for an A2DP service
avdtp_version = await find_avdtp_service_with_connection(connection)
avdtp_version = await find_avdtp_service_with_connection(
device, connection
)
if not avdtp_version:
print(color('!!! no A2DP service found'))
return
-107
View File
@@ -1,107 +0,0 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import sys
import os
from bumble.device import (
Device,
Connection,
)
from bumble.hci import (
OwnAddressType,
HCI_LE_Set_Extended_Advertising_Parameters_Command,
)
from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_cig_setup.py <config-file>'
'<transport-spec-for-device-1> <transport-spec-for-device-2>'
)
print(
'example: run_cig_setup.py device1.json'
'tcp-client:127.0.0.1:6402 tcp-client:127.0.0.1:6402'
)
return
print('<<< connecting to HCI...')
hci_transports = await asyncio.gather(
open_transport_or_link(sys.argv[2]), open_transport_or_link(sys.argv[3])
)
print('<<< connected')
devices = [
Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
for hci_transport in hci_transports
]
devices[0].cis_enabled = True
devices[1].cis_enabled = True
await asyncio.gather(*[device.power_on() for device in devices])
await devices[0].start_extended_advertising(
advertising_properties=(
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING
),
own_address_type=OwnAddressType.PUBLIC,
)
connection = await devices[1].connect(
devices[0].public_address, own_address_type=OwnAddressType.PUBLIC
)
cid_ids = [2, 3]
cis_handles = await devices[1].setup_cig(
cig_id=1,
cis_id=cid_ids,
sdu_interval=(10000, 0),
framing=0,
max_sdu=(120, 0),
retransmission_number=13,
max_transport_latency=(100, 0),
)
def on_cis_request(
connection: Connection, cis_handle: int, _cig_id: int, _cis_id: int
):
connection.abort_on('disconnection', devices[0].accept_cis_request(cis_handle))
devices[0].on('cis_request', on_cis_request)
cis_links = await devices[1].create_cis(
[(cis, connection.handle) for cis in cis_handles]
)
for cis_link in cis_links:
await cis_link.disconnect()
await asyncio.gather(
*[hci_transport.source.terminated for hci_transport in hci_transports]
)
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())
+2 -2
View File
@@ -63,8 +63,8 @@ async def main():
print(f'=== Connected to {connection.peer_address}!')
# Connect to the SDP Server
sdp_client = SDP_Client(connection)
await sdp_client.connect()
sdp_client = SDP_Client(device)
await sdp_client.connect(connection)
# List all services in the root browse group
service_record_handles = await sdp_client.search_services(
-87
View File
@@ -1,87 +0,0 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import dataclasses
import logging
import sys
import os
from bumble.core import BT_BR_EDR_TRANSPORT
from bumble.device import Device, ScoLink
from bumble.hci import HCI_Enhanced_Setup_Synchronous_Connection_Command
from bumble.hfp import DefaultCodecParameters, ESCO_PARAMETERS
from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_esco_connection.py <config-file>'
'<transport-spec-for-device-1> <transport-spec-for-device-2>'
)
print(
'example: run_esco_connection.py classic1.json'
'tcp-client:127.0.0.1:6402 tcp-client:127.0.0.1:6402'
)
return
print('<<< connecting to HCI...')
hci_transports = await asyncio.gather(
open_transport_or_link(sys.argv[2]), open_transport_or_link(sys.argv[3])
)
print('<<< connected')
devices = [
Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
for hci_transport in hci_transports
]
devices[0].classic_enabled = True
devices[1].classic_enabled = True
await asyncio.gather(*[device.power_on() for device in devices])
connections = await asyncio.gather(
devices[0].accept(devices[1].public_address),
devices[1].connect(devices[0].public_address, transport=BT_BR_EDR_TRANSPORT),
)
def on_sco(sco_link: ScoLink):
connections[0].abort_on('disconnection', sco_link.disconnect())
devices[0].once('sco_connection', on_sco)
await devices[0].send_command(
HCI_Enhanced_Setup_Synchronous_Connection_Command(
connection_handle=connections[0].handle,
**ESCO_PARAMETERS[DefaultCodecParameters.ESCO_CVSD_S3].asdict(),
# type: ignore[call-args]
)
)
await asyncio.gather(
*[hci_transport.source.terminated for hci_transport in hci_transports]
)
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())
-69
View File
@@ -1,69 +0,0 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import sys
import os
from bumble.device import AdvertisingType, Device
from bumble.hci import Address, HCI_LE_Set_Extended_Advertising_Parameters_Command
from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_extended_advertiser.py <config-file> <transport-spec> [type] [address]'
)
print('example: run_extended_advertiser.py device1.json usb:0')
return
if len(sys.argv) >= 4:
advertising_properties = (
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties(
int(sys.argv[3])
)
)
else:
advertising_properties = (
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING
)
if len(sys.argv) >= 5:
target = Address(sys.argv[4])
else:
target = Address.ANY
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
await device.power_on()
await device.start_extended_advertising(
advertising_properties=advertising_properties, target=target
)
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())
+3 -11
View File
@@ -31,7 +31,6 @@ from bumble.core import (
BT_BR_EDR_TRANSPORT,
)
from bumble import rfcomm, hfp
from bumble.hci import HCI_SynchronousDataPacket
from bumble.sdp import (
Client as SDP_Client,
DataElement,
@@ -49,8 +48,8 @@ logger = logging.getLogger(__name__)
# pylint: disable-next=too-many-nested-blocks
async def list_rfcomm_channels(device, connection):
# Connect to the SDP Server
sdp_client = SDP_Client(connection)
await sdp_client.connect()
sdp_client = SDP_Client(device)
await sdp_client.connect(connection)
# Search for services that support the Handsfree Profile
search_result = await sdp_client.search_attributes(
@@ -184,7 +183,7 @@ async def main():
# Create a client and start it
print('@@@ Starting to RFCOMM client...')
rfcomm_client = rfcomm.Client(connection)
rfcomm_client = rfcomm.Client(device, connection)
rfcomm_mux = await rfcomm_client.start()
print('@@@ Started')
@@ -198,13 +197,6 @@ async def main():
print('@@@ Disconnected from RFCOMM server')
return
def on_sco(connection_handle: int, packet: HCI_SynchronousDataPacket):
# Reset packet and loopback
packet.packet_status = 0
device.host.send_hci_packet(packet)
device.host.on('sco_packet', on_sco)
# Protocol loop (just for testing at this point)
protocol = hfp.HfpProtocol(session)
while True:
+9 -4
View File
@@ -22,9 +22,12 @@ import logging
from bumble.colors import color
import bumble.core
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.core import (
BT_L2CAP_PROTOCOL_ID,
BT_HIDP_PROTOCOL_ID,
BT_HUMAN_INTERFACE_DEVICE_SERVICE,
BT_BR_EDR_TRANSPORT,
)
@@ -32,6 +35,8 @@ from bumble.hci import Address
from bumble.hid import Host, Message
from bumble.sdp import (
Client as SDP_Client,
DataElement,
ServiceAttribute,
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
@@ -70,11 +75,11 @@ SDP_HID_SSR_HOST_MIN_TIMEOUT_ATTRIBUTE_ID = 0x0210
# -----------------------------------------------------------------------------
async def get_hid_device_sdp_record(connection):
async def get_hid_device_sdp_record(device, connection):
# Connect to the SDP Server
sdp_client = SDP_Client(connection)
await sdp_client.connect()
sdp_client = SDP_Client(device)
await sdp_client.connect(connection)
if sdp_client:
print(color('Connected to SDP Server', 'blue'))
else:
@@ -343,7 +348,7 @@ async def main():
await connection.encrypt()
print('*** Encryption on')
await get_hid_device_sdp_record(connection)
await get_hid_device_sdp_record(device, connection)
# Create HID host and start it
print('@@@ Starting HID Host...')
+5 -5
View File
@@ -42,10 +42,10 @@ from bumble.sdp import (
# -----------------------------------------------------------------------------
async def list_rfcomm_channels(connection):
async def list_rfcomm_channels(device, connection):
# Connect to the SDP Server
sdp_client = SDP_Client(connection)
await sdp_client.connect()
sdp_client = SDP_Client(device)
await sdp_client.connect(connection)
# Search for services with an L2CAP service attribute
search_result = await sdp_client.search_attributes(
@@ -194,7 +194,7 @@ async def main():
channel = sys.argv[4]
if channel == 'discover':
await list_rfcomm_channels(connection)
await list_rfcomm_channels(device, connection)
return
# Request authentication
@@ -209,7 +209,7 @@ async def main():
# Create a client and start it
print('@@@ Starting RFCOMM client...')
rfcomm_client = Client(connection)
rfcomm_client = Client(device, connection)
rfcomm_mux = await rfcomm_client.start()
print('@@@ Started')
-15
View File
@@ -1,15 +0,0 @@
*.iml
.gradle
/local.properties
/.idea/caches
/.idea/libraries
/.idea/modules.xml
/.idea/workspace.xml
/.idea/navEditor.xml
/.idea/assetWizardSettings.xml
.DS_Store
/build
/captures
.externalNativeBuild
.cxx
local.properties
-1
View File
@@ -1 +0,0 @@
/build
@@ -1,70 +0,0 @@
@Suppress("DSL_SCOPE_VIOLATION") // TODO: Remove once KTIJ-19369 is fixed
plugins {
alias(libs.plugins.androidApplication)
alias(libs.plugins.kotlinAndroid)
}
android {
namespace = "com.github.google.bumble.btbench"
compileSdk = 34
defaultConfig {
applicationId = "com.github.google.bumble.btbench"
minSdk = 30
targetSdk = 34
versionCode = 1
versionName = "1.0"
testInstrumentationRunner = "androidx.test.runner.AndroidJUnitRunner"
vectorDrawables {
useSupportLibrary = true
}
}
buildTypes {
release {
isMinifyEnabled = false
proguardFiles(
getDefaultProguardFile("proguard-android-optimize.txt"),
"proguard-rules.pro"
)
}
}
compileOptions {
sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8
}
kotlinOptions {
jvmTarget = "1.8"
}
buildFeatures {
compose = true
}
composeOptions {
kotlinCompilerExtensionVersion = "1.5.1"
}
packaging {
resources {
excludes += "/META-INF/{AL2.0,LGPL2.1}"
}
}
}
dependencies {
implementation(libs.core.ktx)
implementation(libs.lifecycle.runtime.ktx)
implementation(libs.activity.compose)
implementation(platform(libs.compose.bom))
implementation(libs.ui)
implementation(libs.ui.graphics)
implementation(libs.ui.tooling.preview)
implementation(libs.material3)
testImplementation(libs.junit)
androidTestImplementation(libs.androidx.test.ext.junit)
androidTestImplementation(libs.espresso.core)
androidTestImplementation(platform(libs.compose.bom))
androidTestImplementation(libs.ui.test.junit4)
debugImplementation(libs.ui.tooling)
debugImplementation(libs.ui.test.manifest)
}
-21
View File
@@ -1,21 +0,0 @@
# Add project specific ProGuard rules here.
# You can control the set of applied configuration files using the
# proguardFiles setting in build.gradle.
#
# For more details, see
# http://developer.android.com/guide/developing/tools/proguard.html
# If your project uses WebView with JS, uncomment the following
# and specify the fully qualified class name to the JavaScript interface
# class:
#-keepclassmembers class fqcn.of.javascript.interface.for.webview {
# public *;
#}
# Uncomment this to preserve the line number information for
# debugging stack traces.
#-keepattributes SourceFile,LineNumberTable
# If you keep the line number information, uncomment this to
# hide the original source file name.
#-renamesourcefileattribute SourceFile
@@ -1,40 +0,0 @@
<?xml version="1.0" encoding="utf-8"?>
<manifest xmlns:android="http://schemas.android.com/apk/res/android"
xmlns:tools="http://schemas.android.com/tools">
<!-- Request legacy Bluetooth permissions on older devices. -->
<uses-permission android:name="android.permission.BLUETOOTH" android:maxSdkVersion="30" />
<uses-permission android:name="android.permission.BLUETOOTH_ADMIN" android:maxSdkVersion="30" />
<uses-permission android:name="android.permission.BLUETOOTH_SCAN" android:usesPermissionFlags="neverForLocation"/>
<uses-permission android:name="android.permission.BLUETOOTH_ADVERTISE" />
<uses-permission android:name="android.permission.BLUETOOTH_CONNECT" />
<uses-feature android:name="android.hardware.bluetooth" android:required="true"/>
<uses-feature android:name="android.hardware.bluetooth_le" android:required="true"/>
<application
android:allowBackup="true"
android:dataExtractionRules="@xml/data_extraction_rules"
android:fullBackupContent="@xml/backup_rules"
android:icon="@mipmap/ic_launcher"
android:label="@string/app_name"
android:roundIcon="@mipmap/ic_launcher_round"
android:supportsRtl="true"
android:theme="@style/Theme.BTBench"
tools:targetApi="31">
<activity
android:name=".MainActivity"
android:exported="true"
android:label="@string/app_name"
android:theme="@style/Theme.BTBench">
<intent-filter>
<action android:name="android.intent.action.MAIN" />
<category android:name="android.intent.category.LAUNCHER" />
</intent-filter>
</activity>
<!-- <profileable android:shell="true"/>-->
</application>
</manifest>
Binary file not shown.

Before

Width:  |  Height:  |  Size: 42 KiB

@@ -1,35 +0,0 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.github.google.bumble.btbench
import android.annotation.SuppressLint
import android.bluetooth.BluetoothAdapter
import java.io.IOException
import java.util.logging.Logger
import kotlin.concurrent.thread
private val Log = Logger.getLogger("btbench.l2cap-client")
class L2capClient(private val viewModel: AppViewModel, val bluetoothAdapter: BluetoothAdapter) {
@SuppressLint("MissingPermission")
fun run() {
viewModel.running = true
val remoteDevice = bluetoothAdapter.getRemoteDevice(viewModel.peerBluetoothAddress)
val socket = remoteDevice.createInsecureL2capChannel(viewModel.l2capPsm)
val client = SocketClient(viewModel, socket)
client.run()
}
}
@@ -1,62 +0,0 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.github.google.bumble.btbench
import android.annotation.SuppressLint
import android.bluetooth.BluetoothAdapter
import android.bluetooth.le.AdvertiseCallback
import android.bluetooth.le.AdvertiseData
import android.bluetooth.le.AdvertiseSettings
import android.bluetooth.le.AdvertiseSettings.ADVERTISE_MODE_LOW_LATENCY
import android.os.Build
import java.io.IOException
import java.util.logging.Logger
import kotlin.concurrent.thread
private val Log = Logger.getLogger("btbench.l2cap-server")
class L2capServer(private val viewModel: AppViewModel, private val bluetoothAdapter: BluetoothAdapter) {
@SuppressLint("MissingPermission")
fun run() {
// Advertise to that the peer can find us and connect.
val callback = object: AdvertiseCallback() {
override fun onStartFailure(errorCode: Int) {
Log.warning("failed to start advertising: $errorCode")
}
override fun onStartSuccess(settingsInEffect: AdvertiseSettings) {
Log.info("advertising started: $settingsInEffect")
}
}
val advertiseSettingsBuilder = AdvertiseSettings.Builder()
.setAdvertiseMode(ADVERTISE_MODE_LOW_LATENCY)
.setConnectable(true)
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.UPSIDE_DOWN_CAKE) {
advertiseSettingsBuilder.setDiscoverable(true)
}
val advertiseSettings = advertiseSettingsBuilder.build()
val advertiseData = AdvertiseData.Builder().build()
val scanData = AdvertiseData.Builder().setIncludeDeviceName(true).build()
val advertiser = bluetoothAdapter.bluetoothLeAdvertiser
advertiser.startAdvertising(advertiseSettings, advertiseData, scanData, callback)
val serverSocket = bluetoothAdapter.listenUsingInsecureL2capChannel()
viewModel.l2capPsm = serverSocket.psm
Log.info("psm = $serverSocket.psm")
val server = SocketServer(viewModel, serverSocket)
server.run({ advertiser.stopAdvertising(callback) })
}
}
@@ -1,307 +0,0 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.github.google.bumble.btbench
import android.Manifest
import android.annotation.SuppressLint
import android.bluetooth.BluetoothAdapter
import android.bluetooth.BluetoothManager
import android.content.Context
import android.content.Intent
import android.content.pm.PackageManager
import android.os.Build
import android.os.Bundle
import androidx.activity.ComponentActivity
import androidx.activity.compose.setContent
import androidx.activity.result.contract.ActivityResultContracts
import androidx.compose.foundation.layout.Column
import androidx.compose.foundation.layout.Row
import androidx.compose.foundation.layout.fillMaxSize
import androidx.compose.foundation.layout.fillMaxWidth
import androidx.compose.foundation.layout.padding
import androidx.compose.foundation.text.KeyboardActions
import androidx.compose.foundation.text.KeyboardOptions
import androidx.compose.material3.Button
import androidx.compose.material3.Divider
import androidx.compose.material3.MaterialTheme
import androidx.compose.material3.Slider
import androidx.compose.material3.Surface
import androidx.compose.material3.Text
import androidx.compose.material3.TextField
import androidx.compose.runtime.Composable
import androidx.compose.ui.ExperimentalComposeUiApi
import androidx.compose.ui.Modifier
import androidx.compose.ui.platform.LocalSoftwareKeyboardController
import androidx.compose.ui.text.font.FontWeight
import androidx.compose.ui.text.input.ImeAction
import androidx.compose.ui.text.input.KeyboardType
import androidx.compose.ui.text.style.TextAlign
import androidx.compose.ui.unit.dp
import androidx.compose.ui.unit.sp
import androidx.core.content.ContextCompat
import com.github.google.bumble.btbench.ui.theme.BTBenchTheme
import java.util.logging.Logger
private val Log = Logger.getLogger("bumble.main-activity")
const val PEER_BLUETOOTH_ADDRESS_PREF_KEY = "peer_bluetooth_address"
const val SENDER_PACKET_COUNT_PREF_KEY = "sender_packet_count"
const val SENDER_PACKET_SIZE_PREF_KEY = "sender_packet_size"
class MainActivity : ComponentActivity() {
private val appViewModel = AppViewModel()
private var bluetoothAdapter: BluetoothAdapter? = null
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
appViewModel.loadPreferences(getPreferences(Context.MODE_PRIVATE))
checkPermissions()
}
private fun checkPermissions() {
val neededPermissions = if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.S) {
arrayOf(
Manifest.permission.BLUETOOTH_ADVERTISE,
Manifest.permission.BLUETOOTH_SCAN,
Manifest.permission.BLUETOOTH_CONNECT
)
} else {
arrayOf(Manifest.permission.BLUETOOTH, Manifest.permission.BLUETOOTH_ADMIN)
}
val missingPermissions = neededPermissions.filter {
ContextCompat.checkSelfPermission(baseContext, it) != PackageManager.PERMISSION_GRANTED
}
if (missingPermissions.isEmpty()) {
start()
return
}
val requestPermissionsLauncher = registerForActivityResult(
ActivityResultContracts.RequestMultiplePermissions()
) { permissions ->
permissions.entries.forEach {
Log.info("permission: ${it.key} = ${it.value}")
}
val grantCount = permissions.count { it.value }
if (grantCount == neededPermissions.size) {
// We have all the permissions we need.
start()
} else {
Log.warning("not all permissions granted")
}
}
requestPermissionsLauncher.launch(missingPermissions.toTypedArray())
return
}
@SuppressLint("MissingPermission")
private fun initBluetooth() {
val bluetoothManager = ContextCompat.getSystemService(this, BluetoothManager::class.java)
bluetoothAdapter = bluetoothManager?.adapter
if (bluetoothAdapter == null) {
Log.warning("no bluetooth adapter")
return
}
if (!bluetoothAdapter!!.isEnabled) {
Log.warning("bluetooth not enabled")
return
}
}
private fun start() {
initBluetooth()
setContent {
MainView(
appViewModel,
::becomeDiscoverable,
::runRfcommClient,
::runRfcommServer,
::runL2capClient,
::runL2capServer
)
}
// Process intent parameters, if any.
intent.getStringExtra("peer-bluetooth-address")?.let {
appViewModel.peerBluetoothAddress = it
}
val packetCount = intent.getIntExtra("packet-count", 0)
if (packetCount > 0) {
appViewModel.senderPacketCount = packetCount
}
appViewModel.updateSenderPacketCountSlider()
val packetSize = intent.getIntExtra("packet-size", 0)
if (packetSize > 0) {
appViewModel.senderPacketSize = packetSize
}
appViewModel.updateSenderPacketSizeSlider()
intent.getStringExtra("autostart")?.let {
when (it) {
"rfcomm-client" -> runRfcommClient()
"rfcomm-server" -> runRfcommServer()
"l2cap-client" -> runL2capClient()
"l2cap-server" -> runL2capServer()
}
}
}
private fun runRfcommClient() {
val rfcommClient = bluetoothAdapter?.let { RfcommClient(appViewModel, it) }
rfcommClient?.run()
}
private fun runRfcommServer() {
val rfcommServer = bluetoothAdapter?.let { RfcommServer(appViewModel, it) }
rfcommServer?.run()
}
private fun runL2capClient() {
val l2capClient = bluetoothAdapter?.let { L2capClient(appViewModel, it) }
l2capClient?.run()
}
private fun runL2capServer() {
val l2capServer = bluetoothAdapter?.let { L2capServer(appViewModel, it) }
l2capServer?.run()
}
@SuppressLint("MissingPermission")
fun becomeDiscoverable() {
val discoverableIntent = Intent(BluetoothAdapter.ACTION_REQUEST_DISCOVERABLE)
discoverableIntent.putExtra(BluetoothAdapter.EXTRA_DISCOVERABLE_DURATION, 300)
startActivity(discoverableIntent)
}
}
@OptIn(ExperimentalComposeUiApi::class)
@Composable
fun MainView(
appViewModel: AppViewModel,
becomeDiscoverable: () -> Unit,
runRfcommClient: () -> Unit,
runRfcommServer: () -> Unit,
runL2capClient: () -> Unit,
runL2capServer: () -> Unit
) {
BTBenchTheme {
// A surface container using the 'background' color from the theme
Surface(
modifier = Modifier.fillMaxSize(), color = MaterialTheme.colorScheme.background
) {
Column(modifier = Modifier.padding(horizontal = 16.dp)) {
Text(
text = "Bumble Bench",
fontSize = 24.sp,
fontWeight = FontWeight.Bold,
textAlign = TextAlign.Center
)
Divider()
val keyboardController = LocalSoftwareKeyboardController.current
TextField(label = {
Text(text = "Peer Bluetooth Address")
},
value = appViewModel.peerBluetoothAddress,
modifier = Modifier.fillMaxWidth(),
keyboardOptions = KeyboardOptions.Default.copy(
keyboardType = KeyboardType.Ascii, imeAction = ImeAction.Done
),
onValueChange = {
appViewModel.updatePeerBluetoothAddress(it)
},
keyboardActions = KeyboardActions(onDone = { keyboardController?.hide() })
)
Divider()
TextField(label = {
Text(text = "L2CAP PSM")
},
value = appViewModel.l2capPsm.toString(),
modifier = Modifier.fillMaxWidth(),
keyboardOptions = KeyboardOptions.Default.copy(
keyboardType = KeyboardType.Number,
imeAction = ImeAction.Done
),
onValueChange = {
if (it.isNotEmpty()) {
val psm = it.toIntOrNull()
if (psm != null) {
appViewModel.l2capPsm = psm
}
}
},
keyboardActions = KeyboardActions(onDone = { keyboardController?.hide() }))
Divider()
Slider(
value = appViewModel.senderPacketCountSlider, onValueChange = {
appViewModel.senderPacketCountSlider = it
appViewModel.updateSenderPacketCount()
}, steps = 4
)
Text(text = "Packet Count: " + appViewModel.senderPacketCount.toString())
Divider()
Slider(
value = appViewModel.senderPacketSizeSlider, onValueChange = {
appViewModel.senderPacketSizeSlider = it
appViewModel.updateSenderPacketSize()
}, steps = 4
)
Text(text = "Packet Size: " + appViewModel.senderPacketSize.toString())
Divider()
ActionButton(
text = "Become Discoverable", onClick = becomeDiscoverable, true
)
Row() {
ActionButton(
text = "RFCOMM Client", onClick = runRfcommClient, !appViewModel.running
)
ActionButton(
text = "RFCOMM Server", onClick = runRfcommServer, !appViewModel.running
)
}
Row() {
ActionButton(
text = "L2CAP Client", onClick = runL2capClient, !appViewModel.running
)
ActionButton(
text = "L2CAP Server", onClick = runL2capServer, !appViewModel.running
)
}
Divider()
Text(
text = "Packets Sent: ${appViewModel.packetsSent}"
)
Text(
text = "Packets Received: ${appViewModel.packetsReceived}"
)
Text(
text = "Throughput: ${appViewModel.throughput}"
)
Divider()
ActionButton(
text = "Abort", onClick = appViewModel::abort, appViewModel.running
)
}
}
}
}
@Composable
fun ActionButton(text: String, onClick: () -> Unit, enabled: Boolean) {
Button(onClick = onClick, enabled = enabled) {
Text(text = text)
}
}
@@ -1,163 +0,0 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.github.google.bumble.btbench
import android.content.SharedPreferences
import androidx.compose.runtime.getValue
import androidx.compose.runtime.mutableFloatStateOf
import androidx.compose.runtime.mutableIntStateOf
import androidx.compose.runtime.mutableStateOf
import androidx.compose.runtime.setValue
import androidx.lifecycle.ViewModel
import java.util.UUID
val DEFAULT_RFCOMM_UUID = UUID.fromString("E6D55659-C8B4-4B85-96BB-B1143AF6D3AE")
const val DEFAULT_PEER_BLUETOOTH_ADDRESS = "AA:BB:CC:DD:EE:FF"
const val DEFAULT_SENDER_PACKET_COUNT = 100
const val DEFAULT_SENDER_PACKET_SIZE = 1024
class AppViewModel : ViewModel() {
private var preferences: SharedPreferences? = null
var peerBluetoothAddress by mutableStateOf(DEFAULT_PEER_BLUETOOTH_ADDRESS)
var l2capPsm by mutableStateOf(0)
var senderPacketCountSlider by mutableFloatStateOf(0.0F)
var senderPacketSizeSlider by mutableFloatStateOf(0.0F)
var senderPacketCount by mutableIntStateOf(DEFAULT_SENDER_PACKET_COUNT)
var senderPacketSize by mutableIntStateOf(DEFAULT_SENDER_PACKET_SIZE)
var packetsSent by mutableIntStateOf(0)
var packetsReceived by mutableIntStateOf(0)
var throughput by mutableIntStateOf(0)
var running by mutableStateOf(false)
var aborter: (() -> Unit)? = null
fun loadPreferences(preferences: SharedPreferences) {
this.preferences = preferences
val savedPeerBluetoothAddress = preferences.getString(PEER_BLUETOOTH_ADDRESS_PREF_KEY, null)
if (savedPeerBluetoothAddress != null) {
peerBluetoothAddress = savedPeerBluetoothAddress
}
val savedSenderPacketCount = preferences.getInt(SENDER_PACKET_COUNT_PREF_KEY, 0)
if (savedSenderPacketCount != 0) {
senderPacketCount = savedSenderPacketCount
}
updateSenderPacketCountSlider()
val savedSenderPacketSize = preferences.getInt(SENDER_PACKET_SIZE_PREF_KEY, 0)
if (savedSenderPacketSize != 0) {
senderPacketSize = savedSenderPacketSize
}
updateSenderPacketSizeSlider()
}
fun updatePeerBluetoothAddress(peerBluetoothAddress: String) {
this.peerBluetoothAddress = peerBluetoothAddress
// Save the address to the preferences
with(preferences!!.edit()) {
putString(PEER_BLUETOOTH_ADDRESS_PREF_KEY, peerBluetoothAddress)
apply()
}
}
fun updateSenderPacketCountSlider() {
if (senderPacketCount <= 10) {
senderPacketCountSlider = 0.0F
} else if (senderPacketCount <= 50) {
senderPacketCountSlider = 0.2F
} else if (senderPacketCount <= 100) {
senderPacketCountSlider = 0.4F
} else if (senderPacketCount <= 500) {
senderPacketCountSlider = 0.6F
} else if (senderPacketCount <= 1000) {
senderPacketCountSlider = 0.8F
} else {
senderPacketCountSlider = 1.0F
}
with(preferences!!.edit()) {
putInt(SENDER_PACKET_COUNT_PREF_KEY, senderPacketCount)
apply()
}
}
fun updateSenderPacketCount() {
if (senderPacketCountSlider < 0.1F) {
senderPacketCount = 10
} else if (senderPacketCountSlider < 0.3F) {
senderPacketCount = 50
} else if (senderPacketCountSlider < 0.5F) {
senderPacketCount = 100
} else if (senderPacketCountSlider < 0.7F) {
senderPacketCount = 500
} else if (senderPacketCountSlider < 0.9F) {
senderPacketCount = 1000
} else {
senderPacketCount = 10000
}
with(preferences!!.edit()) {
putInt(SENDER_PACKET_COUNT_PREF_KEY, senderPacketCount)
apply()
}
}
fun updateSenderPacketSizeSlider() {
if (senderPacketSize <= 1) {
senderPacketSizeSlider = 0.0F
} else if (senderPacketSize <= 256) {
senderPacketSizeSlider = 0.02F
} else if (senderPacketSize <= 512) {
senderPacketSizeSlider = 0.4F
} else if (senderPacketSize <= 1024) {
senderPacketSizeSlider = 0.6F
} else if (senderPacketSize <= 2048) {
senderPacketSizeSlider = 0.8F
} else {
senderPacketSizeSlider = 1.0F
}
with(preferences!!.edit()) {
putInt(SENDER_PACKET_SIZE_PREF_KEY, senderPacketSize)
apply()
}
}
fun updateSenderPacketSize() {
if (senderPacketSizeSlider < 0.1F) {
senderPacketSize = 1
} else if (senderPacketSizeSlider < 0.3F) {
senderPacketSize = 256
} else if (senderPacketSizeSlider < 0.5F) {
senderPacketSize = 512
} else if (senderPacketSizeSlider < 0.7F) {
senderPacketSize = 1024
} else if (senderPacketSizeSlider < 0.9F) {
senderPacketSize = 2048
} else {
senderPacketSize = 4096
}
with(preferences!!.edit()) {
putInt(SENDER_PACKET_SIZE_PREF_KEY, senderPacketSize)
apply()
}
}
fun abort() {
aborter?.let { it() }
}
}
@@ -1,178 +0,0 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.github.google.bumble.btbench
import android.bluetooth.BluetoothSocket
import java.io.IOException
import java.nio.ByteBuffer
import java.util.logging.Logger
import kotlin.math.min
private val Log = Logger.getLogger("btbench.packet")
fun ByteArray.toHex(): String = joinToString(separator = "") { eachByte -> "%02x".format(eachByte) }
abstract class Packet(val type: Int, val payload: ByteArray = ByteArray(0)) {
companion object {
const val RESET = 0
const val SEQUENCE = 1
const val ACK = 2
const val LAST_FLAG = 1
fun from(data: ByteArray): Packet {
return when (data[0].toInt()) {
RESET -> ResetPacket()
SEQUENCE -> SequencePacket(
data[1].toInt(),
ByteBuffer.wrap(data, 2, 4).getInt(),
data.sliceArray(6..<data.size)
)
ACK -> AckPacket(data[1].toInt(), ByteBuffer.wrap(data, 2, 4).getInt())
else -> GenericPacket(data[0].toInt(), data.sliceArray(1..<data.size))
}
}
}
open fun toBytes(): ByteArray {
return ByteBuffer.allocate(1 + payload.size).put(type.toByte()).put(payload).array()
}
}
class GenericPacket(type: Int, payload: ByteArray) : Packet(type, payload)
class ResetPacket : Packet(RESET)
class AckPacket(val flags: Int, val sequenceNumber: Int) : Packet(ACK) {
override fun toBytes(): ByteArray {
return ByteBuffer.allocate(1 + 1 + 4).put(type.toByte()).put(flags.toByte())
.putInt(sequenceNumber).array()
}
}
class SequencePacket(val flags: Int, val sequenceNumber: Int, payload: ByteArray) :
Packet(SEQUENCE, payload) {
override fun toBytes(): ByteArray {
return ByteBuffer.allocate(1 + 1 + 4 + payload.size).put(type.toByte()).put(flags.toByte())
.putInt(sequenceNumber).put(payload).array()
}
}
abstract class PacketSink {
fun onPacket(packet: Packet) {
when (packet) {
is ResetPacket -> onResetPacket()
is AckPacket -> onAckPacket()
is SequencePacket -> onSequencePacket(packet)
}
}
abstract fun onResetPacket()
abstract fun onAckPacket()
abstract fun onSequencePacket(packet: SequencePacket)
}
interface DataSink {
fun onData(data: ByteArray)
}
interface PacketIO {
var packetSink: PacketSink?
fun sendPacket(packet: Packet)
}
class StreamedPacketIO(private val dataSink: DataSink) : PacketIO {
private var bytesNeeded: Int = 0
private var rxPacket: ByteBuffer? = null
private var rxHeader = ByteBuffer.allocate(2)
override var packetSink: PacketSink? = null
fun onData(data: ByteArray) {
var current = data
while (current.isNotEmpty()) {
if (bytesNeeded > 0) {
val chunk = current.sliceArray(0..<min(bytesNeeded, current.size))
rxPacket!!.put(chunk)
current = current.sliceArray(chunk.size..<current.size)
bytesNeeded -= chunk.size
if (bytesNeeded == 0) {
// Packet completed.
//Log.fine("packet complete: ${current.toHex()}")
packetSink?.onPacket(Packet.from(rxPacket!!.array()))
// Reset.
reset()
}
} else {
val headerBytesNeeded = 2 - rxHeader.position()
val headerBytes = current.sliceArray(0..<min(headerBytesNeeded, current.size))
current = current.sliceArray(headerBytes.size..<current.size)
rxHeader.put(headerBytes)
if (rxHeader.position() != 2) {
return
}
bytesNeeded = rxHeader.getShort(0).toInt()
if (bytesNeeded == 0) {
Log.warning("found 0 size packet!")
reset()
return
}
rxPacket = ByteBuffer.allocate(bytesNeeded)
}
}
}
private fun reset() {
rxPacket = null
rxHeader.position(0)
}
override fun sendPacket(packet: Packet) {
val packetBytes = packet.toBytes()
val packetData =
ByteBuffer.allocate(2 + packetBytes.size).putShort(packetBytes.size.toShort())
.put(packetBytes).array()
dataSink.onData(packetData)
}
}
class SocketDataSink(private val socket: BluetoothSocket) : DataSink {
override fun onData(data: ByteArray) {
socket.outputStream.write(data)
}
}
class SocketDataSource(
private val socket: BluetoothSocket,
private val onData: (data: ByteArray) -> Unit
) {
fun receive() {
val buffer = ByteArray(4096)
do {
try {
val bytesRead = socket.inputStream.read(buffer)
if (bytesRead <= 0) {
break
}
onData(buffer.sliceArray(0..<bytesRead))
} catch (error: IOException) {
Log.warning("IO Exception: $error")
break
}
} while (true)
Log.info("end of stream")
}
}
@@ -1,60 +0,0 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.github.google.bumble.btbench
import java.util.logging.Logger
import kotlin.time.DurationUnit
import kotlin.time.TimeSource
private val Log = Logger.getLogger("btbench.receiver")
class Receiver(private val viewModel: AppViewModel, private val packetIO: PacketIO) : PacketSink() {
private var startTime: TimeSource.Monotonic.ValueTimeMark = TimeSource.Monotonic.markNow()
private var lastPacketTime: TimeSource.Monotonic.ValueTimeMark = TimeSource.Monotonic.markNow()
private var bytesReceived = 0
init {
packetIO.packetSink = this
}
override fun onResetPacket() {
startTime = TimeSource.Monotonic.markNow()
lastPacketTime = startTime
bytesReceived = 0
viewModel.throughput = 0
viewModel.packetsSent = 0
viewModel.packetsReceived = 0
}
override fun onAckPacket() {
}
override fun onSequencePacket(packet: SequencePacket) {
val received = packet.payload.size + 6
bytesReceived += received
val now = TimeSource.Monotonic.markNow()
lastPacketTime = now
viewModel.packetsReceived += 1
if (packet.flags and Packet.LAST_FLAG != 0) {
Log.info("received last packet")
val elapsed = now - startTime
val throughput = (bytesReceived / elapsed.toDouble(DurationUnit.SECONDS)).toInt()
Log.info("throughput: $throughput")
viewModel.throughput = throughput
packetIO.sendPacket(AckPacket(packet.flags, packet.sequenceNumber))
}
}
}
@@ -1,36 +0,0 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.github.google.bumble.btbench
import android.annotation.SuppressLint
import android.bluetooth.BluetoothAdapter
import java.io.IOException
import java.util.logging.Logger
import kotlin.concurrent.thread
private val Log = Logger.getLogger("btbench.rfcomm-client")
class RfcommClient(private val viewModel: AppViewModel, val bluetoothAdapter: BluetoothAdapter) {
@SuppressLint("MissingPermission")
fun run() {
val remoteDevice = bluetoothAdapter.getRemoteDevice(viewModel.peerBluetoothAddress)
val socket = remoteDevice.createInsecureRfcommSocketToServiceRecord(
DEFAULT_RFCOMM_UUID
)
val client = SocketClient(viewModel, socket)
client.run()
}
}
@@ -1,35 +0,0 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.github.google.bumble.btbench
import android.annotation.SuppressLint
import android.bluetooth.BluetoothAdapter
import java.io.IOException
import java.util.logging.Logger
import kotlin.concurrent.thread
private val Log = Logger.getLogger("btbench.rfcomm-server")
class RfcommServer(private val viewModel: AppViewModel, val bluetoothAdapter: BluetoothAdapter) {
@SuppressLint("MissingPermission")
fun run() {
val serverSocket = bluetoothAdapter.listenUsingInsecureRfcommWithServiceRecord(
"BumbleBench", DEFAULT_RFCOMM_UUID
)
val server = SocketServer(viewModel, serverSocket)
server.run({})
}
}
@@ -1,84 +0,0 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.github.google.bumble.btbench
import java.util.concurrent.Semaphore
import java.util.logging.Logger
import kotlin.time.DurationUnit
import kotlin.time.TimeSource
private val Log = Logger.getLogger("btbench.sender")
class Sender(private val viewModel: AppViewModel, private val packetIO: PacketIO) : PacketSink() {
private var startTime: TimeSource.Monotonic.ValueTimeMark = TimeSource.Monotonic.markNow()
private var bytesSent = 0
private val done = Semaphore(0)
init {
packetIO.packetSink = this
}
fun run() {
viewModel.packetsSent = 0
viewModel.packetsReceived = 0
viewModel.throughput = 0
Log.info("sending reset")
packetIO.sendPacket(ResetPacket())
startTime = TimeSource.Monotonic.markNow()
val packetCount = viewModel.senderPacketCount
val packetSize = viewModel.senderPacketSize
for (i in 0..<packetCount - 1) {
packetIO.sendPacket(SequencePacket(0, i, ByteArray(packetSize - 6)))
bytesSent += packetSize
viewModel.packetsSent = i + 1
}
packetIO.sendPacket(
SequencePacket(
Packet.LAST_FLAG,
packetCount - 1,
ByteArray(packetSize - 6)
)
)
bytesSent += packetSize
viewModel.packetsSent = packetCount
// Wait for the ACK
Log.info("waiting for ACK")
done.acquire()
Log.info("got ACK")
}
fun abort() {
done.release()
}
override fun onResetPacket() {
}
override fun onAckPacket() {
Log.info("received ACK")
val elapsed = TimeSource.Monotonic.markNow() - startTime
val throughput = (bytesSent / elapsed.toDouble(DurationUnit.SECONDS)).toInt()
Log.info("throughput: $throughput")
viewModel.throughput = throughput
done.release()
}
override fun onSequencePacket(packet: SequencePacket) {
}
}
@@ -1,63 +0,0 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.github.google.bumble.btbench
import android.annotation.SuppressLint
import android.bluetooth.BluetoothSocket
import java.io.IOException
import java.util.logging.Logger
import kotlin.concurrent.thread
private val Log = Logger.getLogger("btbench.socket-client")
class SocketClient(private val viewModel: AppViewModel, private val socket: BluetoothSocket) {
@SuppressLint("MissingPermission")
fun run() {
viewModel.running = true
val socketDataSink = SocketDataSink(socket)
val streamIO = StreamedPacketIO(socketDataSink)
val socketDataSource = SocketDataSource(socket, streamIO::onData)
val sender = Sender(viewModel, streamIO)
fun cleanup() {
socket.close()
viewModel.aborter = {}
viewModel.running = false
}
thread(name = "SocketClient") {
viewModel.aborter = {
sender.abort()
socket.close()
}
Log.info("connecting to remote")
try {
socket.connect()
} catch (error: IOException) {
Log.warning("connection failed")
cleanup()
return@thread
}
Log.info("connected")
thread {
socketDataSource.receive()
}
sender.run()
cleanup()
}
}
}
@@ -1,66 +0,0 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.github.google.bumble.btbench
import android.bluetooth.BluetoothServerSocket
import java.io.IOException
import java.util.logging.Logger
import kotlin.concurrent.thread
private val Log = Logger.getLogger("btbench.socket-server")
class SocketServer(private val viewModel: AppViewModel, private val serverSocket: BluetoothServerSocket) {
fun run(onTerminate: () -> Unit) {
var aborted = false
viewModel.running = true
fun cleanup() {
serverSocket.close()
viewModel.running = false
onTerminate()
}
thread(name = "SocketServer") {
while (!aborted) {
viewModel.aborter = {
serverSocket.close()
}
Log.info("waiting for connection...")
val socket = try {
serverSocket.accept()
} catch (error: IOException) {
Log.warning("server socket closed")
cleanup()
return@thread
}
Log.info("got connection")
viewModel.aborter = {
aborted = true
socket.close()
}
viewModel.peerBluetoothAddress = socket.remoteDevice.address
val socketDataSink = SocketDataSink(socket)
val streamIO = StreamedPacketIO(socketDataSink)
val socketDataSource = SocketDataSource(socket, streamIO::onData)
val receiver = Receiver(viewModel, streamIO)
socketDataSource.receive()
socket.close()
}
cleanup()
}
}
}
@@ -1,11 +0,0 @@
package com.github.google.bumble.btbench.ui.theme
import androidx.compose.ui.graphics.Color
val Purple80 = Color(0xFFD0BCFF)
val PurpleGrey80 = Color(0xFFCCC2DC)
val Pink80 = Color(0xFFEFB8C8)
val Purple40 = Color(0xFF6650a4)
val PurpleGrey40 = Color(0xFF625b71)
val Pink40 = Color(0xFF7D5260)
@@ -1,63 +0,0 @@
package com.github.google.bumble.btbench.ui.theme
import android.app.Activity
import android.os.Build
import androidx.compose.foundation.isSystemInDarkTheme
import androidx.compose.material3.MaterialTheme
import androidx.compose.material3.darkColorScheme
import androidx.compose.material3.dynamicDarkColorScheme
import androidx.compose.material3.dynamicLightColorScheme
import androidx.compose.material3.lightColorScheme
import androidx.compose.runtime.Composable
import androidx.compose.runtime.SideEffect
import androidx.compose.ui.graphics.toArgb
import androidx.compose.ui.platform.LocalContext
import androidx.compose.ui.platform.LocalView
import androidx.core.view.WindowCompat
private val DarkColorScheme = darkColorScheme(
primary = Purple80, secondary = PurpleGrey80, tertiary = Pink80
)
private val LightColorScheme = lightColorScheme(
primary = Purple40, secondary = PurpleGrey40, tertiary = Pink40
/* Other default colors to override
background = Color(0xFFFFFBFE),
surface = Color(0xFFFFFBFE),
onPrimary = Color.White,
onSecondary = Color.White,
onTertiary = Color.White,
onBackground = Color(0xFF1C1B1F),
onSurface = Color(0xFF1C1B1F),
*/
)
@Composable
fun BTBenchTheme(
darkTheme: Boolean = isSystemInDarkTheme(),
// Dynamic color is available on Android 12+
dynamicColor: Boolean = true, content: @Composable () -> Unit
) {
val colorScheme = when {
dynamicColor && Build.VERSION.SDK_INT >= Build.VERSION_CODES.S -> {
val context = LocalContext.current
if (darkTheme) dynamicDarkColorScheme(context) else dynamicLightColorScheme(context)
}
darkTheme -> DarkColorScheme
else -> LightColorScheme
}
val view = LocalView.current
if (!view.isInEditMode) {
SideEffect {
val window = (view.context as Activity).window
window.statusBarColor = colorScheme.primary.toArgb()
WindowCompat.getInsetsController(window, view).isAppearanceLightStatusBars = darkTheme
}
}
MaterialTheme(
colorScheme = colorScheme, typography = Typography, content = content
)
}
@@ -1,33 +0,0 @@
package com.github.google.bumble.btbench.ui.theme
import androidx.compose.material3.Typography
import androidx.compose.ui.text.TextStyle
import androidx.compose.ui.text.font.FontFamily
import androidx.compose.ui.text.font.FontWeight
import androidx.compose.ui.unit.sp
// Set of Material typography styles to start with
val Typography = Typography(
bodyLarge = TextStyle(
fontFamily = FontFamily.Default,
fontWeight = FontWeight.Normal,
fontSize = 16.sp,
lineHeight = 24.sp,
letterSpacing = 0.5.sp
)/* Other default text styles to override
titleLarge = TextStyle(
fontFamily = FontFamily.Default,
fontWeight = FontWeight.Normal,
fontSize = 22.sp,
lineHeight = 28.sp,
letterSpacing = 0.sp
),
labelSmall = TextStyle(
fontFamily = FontFamily.Default,
fontWeight = FontWeight.Medium,
fontSize = 11.sp,
lineHeight = 16.sp,
letterSpacing = 0.5.sp
)
*/
)
@@ -1,74 +0,0 @@
<?xml version="1.0" encoding="utf-8"?>
<vector
android:height="108dp"
android:width="108dp"
android:viewportHeight="108"
android:viewportWidth="108"
xmlns:android="http://schemas.android.com/apk/res/android">
<path android:fillColor="#3DDC84"
android:pathData="M0,0h108v108h-108z"/>
<path android:fillColor="#00000000" android:pathData="M9,0L9,108"
android:strokeColor="#33FFFFFF" android:strokeWidth="0.8"/>
<path android:fillColor="#00000000" android:pathData="M19,0L19,108"
android:strokeColor="#33FFFFFF" android:strokeWidth="0.8"/>
<path android:fillColor="#00000000" android:pathData="M29,0L29,108"
android:strokeColor="#33FFFFFF" android:strokeWidth="0.8"/>
<path android:fillColor="#00000000" android:pathData="M39,0L39,108"
android:strokeColor="#33FFFFFF" android:strokeWidth="0.8"/>
<path android:fillColor="#00000000" android:pathData="M49,0L49,108"
android:strokeColor="#33FFFFFF" android:strokeWidth="0.8"/>
<path android:fillColor="#00000000" android:pathData="M59,0L59,108"
android:strokeColor="#33FFFFFF" android:strokeWidth="0.8"/>
<path android:fillColor="#00000000" android:pathData="M69,0L69,108"
android:strokeColor="#33FFFFFF" android:strokeWidth="0.8"/>
<path android:fillColor="#00000000" android:pathData="M79,0L79,108"
android:strokeColor="#33FFFFFF" android:strokeWidth="0.8"/>
<path android:fillColor="#00000000" android:pathData="M89,0L89,108"
android:strokeColor="#33FFFFFF" android:strokeWidth="0.8"/>
<path android:fillColor="#00000000" android:pathData="M99,0L99,108"
android:strokeColor="#33FFFFFF" android:strokeWidth="0.8"/>
<path android:fillColor="#00000000" android:pathData="M0,9L108,9"
android:strokeColor="#33FFFFFF" android:strokeWidth="0.8"/>
<path android:fillColor="#00000000" android:pathData="M0,19L108,19"
android:strokeColor="#33FFFFFF" android:strokeWidth="0.8"/>
<path android:fillColor="#00000000" android:pathData="M0,29L108,29"
android:strokeColor="#33FFFFFF" android:strokeWidth="0.8"/>
<path android:fillColor="#00000000" android:pathData="M0,39L108,39"
android:strokeColor="#33FFFFFF" android:strokeWidth="0.8"/>
<path android:fillColor="#00000000" android:pathData="M0,49L108,49"
android:strokeColor="#33FFFFFF" android:strokeWidth="0.8"/>
<path android:fillColor="#00000000" android:pathData="M0,59L108,59"
android:strokeColor="#33FFFFFF" android:strokeWidth="0.8"/>
<path android:fillColor="#00000000" android:pathData="M0,69L108,69"
android:strokeColor="#33FFFFFF" android:strokeWidth="0.8"/>
<path android:fillColor="#00000000" android:pathData="M0,79L108,79"
android:strokeColor="#33FFFFFF" android:strokeWidth="0.8"/>
<path android:fillColor="#00000000" android:pathData="M0,89L108,89"
android:strokeColor="#33FFFFFF" android:strokeWidth="0.8"/>
<path android:fillColor="#00000000" android:pathData="M0,99L108,99"
android:strokeColor="#33FFFFFF" android:strokeWidth="0.8"/>
<path android:fillColor="#00000000" android:pathData="M19,29L89,29"
android:strokeColor="#33FFFFFF" android:strokeWidth="0.8"/>
<path android:fillColor="#00000000" android:pathData="M19,39L89,39"
android:strokeColor="#33FFFFFF" android:strokeWidth="0.8"/>
<path android:fillColor="#00000000" android:pathData="M19,49L89,49"
android:strokeColor="#33FFFFFF" android:strokeWidth="0.8"/>
<path android:fillColor="#00000000" android:pathData="M19,59L89,59"
android:strokeColor="#33FFFFFF" android:strokeWidth="0.8"/>
<path android:fillColor="#00000000" android:pathData="M19,69L89,69"
android:strokeColor="#33FFFFFF" android:strokeWidth="0.8"/>
<path android:fillColor="#00000000" android:pathData="M19,79L89,79"
android:strokeColor="#33FFFFFF" android:strokeWidth="0.8"/>
<path android:fillColor="#00000000" android:pathData="M29,19L29,89"
android:strokeColor="#33FFFFFF" android:strokeWidth="0.8"/>
<path android:fillColor="#00000000" android:pathData="M39,19L39,89"
android:strokeColor="#33FFFFFF" android:strokeWidth="0.8"/>
<path android:fillColor="#00000000" android:pathData="M49,19L49,89"
android:strokeColor="#33FFFFFF" android:strokeWidth="0.8"/>
<path android:fillColor="#00000000" android:pathData="M59,19L59,89"
android:strokeColor="#33FFFFFF" android:strokeWidth="0.8"/>
<path android:fillColor="#00000000" android:pathData="M69,19L69,89"
android:strokeColor="#33FFFFFF" android:strokeWidth="0.8"/>
<path android:fillColor="#00000000" android:pathData="M79,19L79,89"
android:strokeColor="#33FFFFFF" android:strokeWidth="0.8"/>
</vector>
@@ -1,30 +0,0 @@
<vector xmlns:android="http://schemas.android.com/apk/res/android"
xmlns:aapt="http://schemas.android.com/aapt"
android:width="108dp"
android:height="108dp"
android:viewportWidth="108"
android:viewportHeight="108">
<path android:pathData="M31,63.928c0,0 6.4,-11 12.1,-13.1c7.2,-2.6 26,-1.4 26,-1.4l38.1,38.1L107,108.928l-32,-1L31,63.928z">
<aapt:attr name="android:fillColor">
<gradient
android:endX="85.84757"
android:endY="92.4963"
android:startX="42.9492"
android:startY="49.59793"
android:type="linear">
<item
android:color="#44000000"
android:offset="0.0" />
<item
android:color="#00000000"
android:offset="1.0" />
</gradient>
</aapt:attr>
</path>
<path
android:fillColor="#FFFFFF"
android:fillType="nonZero"
android:pathData="M65.3,45.828l3.8,-6.6c0.2,-0.4 0.1,-0.9 -0.3,-1.1c-0.4,-0.2 -0.9,-0.1 -1.1,0.3l-3.9,6.7c-6.3,-2.8 -13.4,-2.8 -19.7,0l-3.9,-6.7c-0.2,-0.4 -0.7,-0.5 -1.1,-0.3C38.8,38.328 38.7,38.828 38.9,39.228l3.8,6.6C36.2,49.428 31.7,56.028 31,63.928h46C76.3,56.028 71.8,49.428 65.3,45.828zM43.4,57.328c-0.8,0 -1.5,-0.5 -1.8,-1.2c-0.3,-0.7 -0.1,-1.5 0.4,-2.1c0.5,-0.5 1.4,-0.7 2.1,-0.4c0.7,0.3 1.2,1 1.2,1.8C45.3,56.528 44.5,57.328 43.4,57.328L43.4,57.328zM64.6,57.328c-0.8,0 -1.5,-0.5 -1.8,-1.2s-0.1,-1.5 0.4,-2.1c0.5,-0.5 1.4,-0.7 2.1,-0.4c0.7,0.3 1.2,1 1.2,1.8C66.5,56.528 65.6,57.328 64.6,57.328L64.6,57.328z"
android:strokeWidth="1"
android:strokeColor="#00000000" />
</vector>
@@ -1,5 +0,0 @@
<?xml version="1.0" encoding="utf-8"?>
<adaptive-icon xmlns:android="http://schemas.android.com/apk/res/android">
<background android:drawable="@color/ic_launcher_background"/>
<foreground android:drawable="@mipmap/ic_launcher_foreground"/>
</adaptive-icon>
@@ -1,5 +0,0 @@
<?xml version="1.0" encoding="utf-8"?>
<adaptive-icon xmlns:android="http://schemas.android.com/apk/res/android">
<background android:drawable="@color/ic_launcher_background"/>
<foreground android:drawable="@mipmap/ic_launcher_foreground"/>
</adaptive-icon>
Binary file not shown.

Before

Width:  |  Height:  |  Size: 3.5 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 4.2 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 2.3 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 2.7 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 4.6 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 6.1 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 6.9 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 9.4 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 9.5 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 13 KiB

@@ -1,10 +0,0 @@
<?xml version="1.0" encoding="utf-8"?>
<resources>
<color name="purple_200">#FFBB86FC</color>
<color name="purple_500">#FF6200EE</color>
<color name="purple_700">#FF3700B3</color>
<color name="teal_200">#FF03DAC5</color>
<color name="teal_700">#FF018786</color>
<color name="black">#FF000000</color>
<color name="white">#FFFFFFFF</color>
</resources>
@@ -1,4 +0,0 @@
<?xml version="1.0" encoding="utf-8"?>
<resources>
<color name="ic_launcher_background">#FFFFFF</color>
</resources>
@@ -1,3 +0,0 @@
<resources>
<string name="app_name">BT Bench</string>
</resources>
@@ -1,5 +0,0 @@
<?xml version="1.0" encoding="utf-8"?>
<resources>
<style name="Theme.BTBench" parent="android:Theme.Material.Light.NoActionBar" />
</resources>
@@ -1,13 +0,0 @@
<?xml version="1.0" encoding="utf-8"?><!--
Sample backup rules file; uncomment and customize as necessary.
See https://developer.android.com/guide/topics/data/autobackup
for details.
Note: This file is ignored for devices older that API 31
See https://developer.android.com/about/versions/12/backup-restore
-->
<full-backup-content>
<!--
<include domain="sharedpref" path="."/>
<exclude domain="sharedpref" path="device.xml"/>
-->
</full-backup-content>
@@ -1,19 +0,0 @@
<?xml version="1.0" encoding="utf-8"?><!--
Sample data extraction rules file; uncomment and customize as necessary.
See https://developer.android.com/about/versions/12/backup-restore#xml-changes
for details.
-->
<data-extraction-rules>
<cloud-backup>
<!-- TODO: Use <include> and <exclude> to control what is backed up.
<include .../>
<exclude .../>
-->
</cloud-backup>
<!--
<device-transfer>
<include .../>
<exclude .../>
</device-transfer>
-->
</data-extraction-rules>
-7
View File
@@ -1,7 +0,0 @@
// Top-level build file where you can add configuration options common to all sub-projects/modules.
@Suppress("DSL_SCOPE_VIOLATION") // TODO: Remove once KTIJ-19369 is fixed
plugins {
alias(libs.plugins.androidApplication) apply false
alias(libs.plugins.kotlinAndroid) apply false
}
true // Needed to make the Suppress annotation work for the plugins block
-23
View File
@@ -1,23 +0,0 @@
# Project-wide Gradle settings.
# IDE (e.g. Android Studio) users:
# Gradle settings configured through the IDE *will override*
# any settings specified in this file.
# For more details on how to configure your build environment visit
# http://www.gradle.org/docs/current/userguide/build_environment.html
# Specifies the JVM arguments used for the daemon process.
# The setting is particularly useful for tweaking memory settings.
org.gradle.jvmargs=-Xmx2048m -Dfile.encoding=UTF-8
# When configured, Gradle will run in incubating parallel mode.
# This option should only be used with decoupled projects. More details, visit
# http://www.gradle.org/docs/current/userguide/multi_project_builds.html#sec:decoupled_projects
# org.gradle.parallel=true
# AndroidX package structure to make it clearer which packages are bundled with the
# Android operating system, and which are packaged with your app's APK
# https://developer.android.com/topic/libraries/support-library/androidx-rn
android.useAndroidX=true
# Kotlin code style for this project: "official" or "obsolete":
kotlin.code.style=official
# Enables namespacing of each library's R class so that its R class includes only the
# resources declared in the library itself and none from the library's dependencies,
# thereby reducing the size of the R class for that library
android.nonTransitiveRClass=true
@@ -1,31 +0,0 @@
[versions]
agp = "8.2.0"
kotlin = "1.9.0"
core-ktx = "1.12.0"
junit = "4.13.2"
androidx-test-ext-junit = "1.1.5"
espresso-core = "3.5.1"
lifecycle-runtime-ktx = "2.6.2"
activity-compose = "1.7.2"
compose-bom = "2023.08.00"
[libraries]
core-ktx = { group = "androidx.core", name = "core-ktx", version.ref = "core-ktx" }
junit = { group = "junit", name = "junit", version.ref = "junit" }
androidx-test-ext-junit = { group = "androidx.test.ext", name = "junit", version.ref = "androidx-test-ext-junit" }
espresso-core = { group = "androidx.test.espresso", name = "espresso-core", version.ref = "espresso-core" }
lifecycle-runtime-ktx = { group = "androidx.lifecycle", name = "lifecycle-runtime-ktx", version.ref = "lifecycle-runtime-ktx" }
activity-compose = { group = "androidx.activity", name = "activity-compose", version.ref = "activity-compose" }
compose-bom = { group = "androidx.compose", name = "compose-bom", version.ref = "compose-bom" }
ui = { group = "androidx.compose.ui", name = "ui" }
ui-graphics = { group = "androidx.compose.ui", name = "ui-graphics" }
ui-tooling = { group = "androidx.compose.ui", name = "ui-tooling" }
ui-tooling-preview = { group = "androidx.compose.ui", name = "ui-tooling-preview" }
ui-test-manifest = { group = "androidx.compose.ui", name = "ui-test-manifest" }
ui-test-junit4 = { group = "androidx.compose.ui", name = "ui-test-junit4" }
material3 = { group = "androidx.compose.material3", name = "material3" }
[plugins]
androidApplication = { id = "com.android.application", version.ref = "agp" }
kotlinAndroid = { id = "org.jetbrains.kotlin.android", version.ref = "kotlin" }
Binary file not shown.
@@ -1,6 +0,0 @@
#Wed Oct 25 07:40:52 PDT 2023
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.4-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
-185
View File
@@ -1,185 +0,0 @@
#!/usr/bin/env sh
#
# Copyright 2015 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
##############################################################################
##
## Gradle start up script for UN*X
##
##############################################################################
# Attempt to set APP_HOME
# Resolve links: $0 may be a link
PRG="$0"
# Need this for relative symlinks.
while [ -h "$PRG" ] ; do
ls=`ls -ld "$PRG"`
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
PRG="$link"
else
PRG=`dirname "$PRG"`"/$link"
fi
done
SAVED="`pwd`"
cd "`dirname \"$PRG\"`/" >/dev/null
APP_HOME="`pwd -P`"
cd "$SAVED" >/dev/null
APP_NAME="Gradle"
APP_BASE_NAME=`basename "$0"`
# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD="maximum"
warn () {
echo "$*"
}
die () {
echo
echo "$*"
echo
exit 1
}
# OS specific support (must be 'true' or 'false').
cygwin=false
msys=false
darwin=false
nonstop=false
case "`uname`" in
CYGWIN* )
cygwin=true
;;
Darwin* )
darwin=true
;;
MINGW* )
msys=true
;;
NONSTOP* )
nonstop=true
;;
esac
CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
# Determine the Java command to use to start the JVM.
if [ -n "$JAVA_HOME" ] ; then
if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
# IBM's JDK on AIX uses strange locations for the executables
JAVACMD="$JAVA_HOME/jre/sh/java"
else
JAVACMD="$JAVA_HOME/bin/java"
fi
if [ ! -x "$JAVACMD" ] ; then
die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME
Please set the JAVA_HOME variable in your environment to match the
location of your Java installation."
fi
else
JAVACMD="java"
which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
Please set the JAVA_HOME variable in your environment to match the
location of your Java installation."
fi
# Increase the maximum file descriptors if we can.
if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then
MAX_FD_LIMIT=`ulimit -H -n`
if [ $? -eq 0 ] ; then
if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then
MAX_FD="$MAX_FD_LIMIT"
fi
ulimit -n $MAX_FD
if [ $? -ne 0 ] ; then
warn "Could not set maximum file descriptor limit: $MAX_FD"
fi
else
warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT"
fi
fi
# For Darwin, add options to specify how the application appears in the dock
if $darwin; then
GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\""
fi
# For Cygwin or MSYS, switch paths to Windows format before running java
if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then
APP_HOME=`cygpath --path --mixed "$APP_HOME"`
CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`
JAVACMD=`cygpath --unix "$JAVACMD"`
# We build the pattern for arguments to be converted via cygpath
ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null`
SEP=""
for dir in $ROOTDIRSRAW ; do
ROOTDIRS="$ROOTDIRS$SEP$dir"
SEP="|"
done
OURCYGPATTERN="(^($ROOTDIRS))"
# Add a user-defined pattern to the cygpath arguments
if [ "$GRADLE_CYGPATTERN" != "" ] ; then
OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)"
fi
# Now convert the arguments - kludge to limit ourselves to /bin/sh
i=0
for arg in "$@" ; do
CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -`
CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option
if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition
eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"`
else
eval `echo args$i`="\"$arg\""
fi
i=`expr $i + 1`
done
case $i in
0) set -- ;;
1) set -- "$args0" ;;
2) set -- "$args0" "$args1" ;;
3) set -- "$args0" "$args1" "$args2" ;;
4) set -- "$args0" "$args1" "$args2" "$args3" ;;
5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;;
6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;;
7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;;
8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;;
9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;;
esac
fi
# Escape application args
save () {
for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done
echo " "
}
APP_ARGS=`save "$@"`
# Collect all arguments for the java command, following the shell quoting and substitution rules
eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS"
exec "$JAVACMD" "$@"
-89
View File
@@ -1,89 +0,0 @@
@rem
@rem Copyright 2015 the original author or authors.
@rem
@rem Licensed under the Apache License, Version 2.0 (the "License");
@rem you may not use this file except in compliance with the License.
@rem You may obtain a copy of the License at
@rem
@rem https://www.apache.org/licenses/LICENSE-2.0
@rem
@rem Unless required by applicable law or agreed to in writing, software
@rem distributed under the License is distributed on an "AS IS" BASIS,
@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@rem See the License for the specific language governing permissions and
@rem limitations under the License.
@rem
@if "%DEBUG%" == "" @echo off
@rem ##########################################################################
@rem
@rem Gradle startup script for Windows
@rem
@rem ##########################################################################
@rem Set local scope for the variables with windows NT shell
if "%OS%"=="Windows_NT" setlocal
set DIRNAME=%~dp0
if "%DIRNAME%" == "" set DIRNAME=.
set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME%
@rem Resolve any "." and ".." in APP_HOME to make it shorter.
for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi
@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m"
@rem Find java.exe
if defined JAVA_HOME goto findJavaFromJavaHome
set JAVA_EXE=java.exe
%JAVA_EXE% -version >NUL 2>&1
if "%ERRORLEVEL%" == "0" goto execute
echo.
echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
echo.
echo Please set the JAVA_HOME variable in your environment to match the
echo location of your Java installation.
goto fail
:findJavaFromJavaHome
set JAVA_HOME=%JAVA_HOME:"=%
set JAVA_EXE=%JAVA_HOME%/bin/java.exe
if exist "%JAVA_EXE%" goto execute
echo.
echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME%
echo.
echo Please set the JAVA_HOME variable in your environment to match the
echo location of your Java installation.
goto fail
:execute
@rem Setup the command line
set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar
@rem Execute Gradle
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %*
:end
@rem End local scope for the variables with windows NT shell
if "%ERRORLEVEL%"=="0" goto mainEnd
:fail
rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of
rem the _cmd.exe /c_ return code!
if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1
exit /b 1
:mainEnd
if "%OS%"=="Windows_NT" endlocal
:omega
@@ -1,24 +0,0 @@
pluginManagement {
repositories {
google {
content {
includeGroupByRegex("com\\.android.*")
includeGroupByRegex("com\\.google.*")
includeGroupByRegex("androidx.*")
}
}
mavenCentral()
gradlePluginPortal()
}
}
dependencyResolutionManagement {
repositoriesMode.set(RepositoriesMode.FAIL_ON_PROJECT_REPOS)
repositories {
google()
mavenCentral()
}
}
rootProject.name = "BT Bench"
include(":app")
@@ -1,57 +0,0 @@
package com.github.google.bumble.remotehci
import java.io.IOException
class CommandLineInterface {
companion object {
fun printUsage() {
System.out.println("usage: <launch-command> [-h|--help] [<tcp-port>]")
}
@JvmStatic fun main(args: Array<String>) {
System.out.println("Starting proxy")
var tcpPort = DEFAULT_TCP_PORT
if (args.isNotEmpty()) {
if (args[0] == "-h" || args[0] == "--help") {
printUsage()
return
}
try {
tcpPort = args[0].toInt()
} catch (error: NumberFormatException) {
System.out.println("ERROR: invalid TCP port argument")
printUsage()
return
}
}
try {
val hciProxy = HciProxy(tcpPort, object : HciProxy.Listener {
override fun onHostConnectionState(connected: Boolean) {
}
override fun onHciPacketCountChange(
commandPacketsReceived: Int,
aclPacketsReceived: Int,
scoPacketsReceived: Int,
eventPacketsSent: Int,
aclPacketsSent: Int,
scoPacketsSent: Int
) {
}
override fun onMessage(message: String?) {
System.out.println(message)
}
})
hciProxy.run()
} catch (error: IOException) {
System.err.println("Exception while running HCI Server: $error")
} catch (error: HciProxy.HalException) {
System.err.println("HAL exception: ${error.message}")
}
}
}
}
@@ -1,5 +1,5 @@
[versions]
agp = "8.2.0"
agp = "8.3.0-alpha11"
kotlin = "1.8.10"
core-ktx = "1.9.0"
junit = "4.13.2"
+2 -19
View File
@@ -12,13 +12,6 @@ keywords = ["bluetooth", "ble"]
categories = ["api-bindings", "network-programming"]
rust-version = "1.70.0"
# https://github.com/frewsxcv/cargo-all-features#options
[package.metadata.cargo-all-features]
# We are interested in testing subset combinations of this feature, so this is redundant
denylist = ["unstable"]
# To exercise combinations of any of these features, remove from `always_include_features`
always_include_features = ["anyhow", "pyo3-asyncio-attributes", "dev-tools", "bumble-tools"]
[dependencies]
pyo3 = { version = "0.18.3", features = ["macros"] }
pyo3-asyncio = { version = "0.18.0", features = ["tokio-runtime"] }
@@ -33,7 +26,6 @@ thiserror = "1.0.41"
bytes = "1.5.0"
pdl-derive = "0.2.0"
pdl-runtime = "0.2.0"
futures = "0.3.28"
# Dev tools
file-header = { version = "0.1.2", optional = true }
@@ -44,6 +36,7 @@ anyhow = { version = "1.0.71", optional = true }
clap = { version = "4.3.3", features = ["derive"], optional = true }
directories = { version = "5.0.1", optional = true }
env_logger = { version = "0.10.0", optional = true }
futures = { version = "0.3.28", optional = true }
log = { version = "0.4.19", optional = true }
owo-colors = { version = "3.5.0", optional = true }
reqwest = { version = "0.11.20", features = ["blocking"], optional = true }
@@ -81,11 +74,6 @@ name = "bumble"
path = "src/main.rs"
required-features = ["bumble-tools"]
[[example]]
name = "broadcast"
path = "examples/broadcast.rs"
required-features = ["unstable_extended_adv"]
# test entry point that uses pyo3_asyncio's test harness
[[test]]
name = "pytests"
@@ -97,10 +85,5 @@ anyhow = ["pyo3/anyhow"]
pyo3-asyncio-attributes = ["pyo3-asyncio/attributes"]
dev-tools = ["dep:anyhow", "dep:clap", "dep:file-header", "dep:globset"]
# separate feature for CLI so that dependencies don't spend time building these
bumble-tools = ["dep:clap", "anyhow", "dep:anyhow", "dep:directories", "pyo3-asyncio-attributes", "dep:owo-colors", "dep:reqwest", "dep:rusb", "dep:log", "dep:env_logger"]
# all the unstable features
unstable = ["unstable_extended_adv"]
unstable_extended_adv = []
bumble-tools = ["dep:clap", "anyhow", "dep:anyhow", "dep:directories", "pyo3-asyncio-attributes", "dep:owo-colors", "dep:reqwest", "dep:rusb", "dep:log", "dep:env_logger", "dep:futures"]
default = []
+6 -3
View File
@@ -33,7 +33,6 @@
use bumble::wrapper::{
device::{Device, Peer},
hci::{packets::AddressType, Address},
profile::BatteryServiceProxy,
transport::Transport,
PyObjectExt,
@@ -53,8 +52,12 @@ async fn main() -> PyResult<()> {
let transport = Transport::open(cli.transport).await?;
let address = Address::new("F0:F1:F2:F3:F4:F5", AddressType::RandomDeviceAddress)?;
let device = Device::with_hci("Bumble", address, transport.source()?, transport.sink()?)?;
let device = Device::with_hci(
"Bumble",
"F0:F1:F2:F3:F4:F5",
transport.source()?,
transport.sink()?,
)?;
device.power_on().await?;
+5 -21
View File
@@ -63,28 +63,17 @@ async fn main() -> PyResult<()> {
)
.map_err(|e| anyhow!(e))?;
device.set_advertising_data(adv_data)?;
device.power_on().await?;
if cli.extended {
println!("Starting extended advertisement...");
device.start_advertising_extended(adv_data).await?;
} else {
device.set_advertising_data(adv_data)?;
println!("Starting legacy advertisement...");
device.start_advertising(true).await?;
}
println!("Advertising...");
device.start_advertising(true).await?;
// wait until user kills the process
tokio::signal::ctrl_c().await?;
if cli.extended {
println!("Stopping extended advertisement...");
device.stop_advertising_extended().await?;
} else {
println!("Stopping legacy advertisement...");
device.stop_advertising().await?;
}
println!("Stopping...");
device.stop_advertising().await?;
Ok(())
}
@@ -97,17 +86,12 @@ struct Cli {
/// See, for instance, `examples/device1.json` in the Python project.
#[arg(long)]
device_config: path::PathBuf,
/// Bumble transport spec.
///
/// <https://google.github.io/bumble/transports/index.html>
#[arg(long)]
transport: String,
/// Whether to perform an extended (BT 5.0) advertisement
#[arg(long)]
extended: bool,
/// Log HCI commands
#[arg(long)]
log_hci: bool,
+7 -5
View File
@@ -20,9 +20,7 @@
use bumble::{
adv::CommonDataType,
wrapper::{
core::AdvertisementDataUnit,
device::Device,
hci::{packets::AddressType, Address},
core::AdvertisementDataUnit, device::Device, hci::packets::AddressType,
transport::Transport,
},
};
@@ -46,8 +44,12 @@ async fn main() -> PyResult<()> {
let transport = Transport::open(cli.transport).await?;
let address = Address::new("F0:F1:F2:F3:F4:F5", AddressType::RandomDeviceAddress)?;
let mut device = Device::with_hci("Bumble", address, transport.source()?, transport.sink()?)?;
let mut device = Device::with_hci(
"Bumble",
"F0:F1:F2:F3:F4:F5",
transport.source()?,
transport.sink()?,
)?;
// in practice, devices can send multiple advertisements from the same address, so we keep
// track of a timestamp for each set of data
+77
View File
@@ -0,0 +1,77 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use bumble::wrapper::{
controller::Controller,
device::Device,
drivers::rtk::DriverInfo,
hci::{
packets::{
AddressType, ErrorCode, ReadLocalVersionInformationBuilder,
ReadLocalVersionInformationComplete,
},
Address, Error,
},
host::Host,
link::Link,
transport::Transport,
};
use nix::sys::stat::Mode;
use pyo3::{
exceptions::PyException,
{PyErr, PyResult},
};
#[pyo3_asyncio::tokio::test]
async fn fifo_transport_can_open() -> PyResult<()> {
let dir = tempfile::tempdir().unwrap();
let mut fifo = dir.path().to_path_buf();
fifo.push("bumble-transport-fifo");
nix::unistd::mkfifo(&fifo, Mode::S_IRWXU).unwrap();
let mut t = Transport::open(format!("file:{}", fifo.to_str().unwrap())).await?;
t.close().await?;
Ok(())
}
#[pyo3_asyncio::tokio::test]
async fn realtek_driver_info_all_drivers() -> PyResult<()> {
assert_eq!(12, DriverInfo::all_drivers()?.len());
Ok(())
}
#[pyo3_asyncio::tokio::test]
async fn hci_command_wrapper_has_correct_methods() -> PyResult<()> {
let address = Address::new("F0:F1:F2:F3:F4:F5", &AddressType::RandomDeviceAddress)?;
let link = Link::new_local_link()?;
let controller = Controller::new("C1", None, None, Some(link), Some(address.clone())).await?;
let host = Host::new(controller.clone().into(), controller.into()).await?;
let device = Device::new(None, Some(address), None, Some(host), None)?;
device.power_on().await?;
// Send some simple command. A successful response means [HciCommandWrapper] has the minimum
// required interface for the Python code to think its an [HCI_Command] object.
let command = ReadLocalVersionInformationBuilder {};
let event: ReadLocalVersionInformationComplete = device
.send_command(&command.into(), true)
.await?
.try_into()
.map_err(|e: Error| PyErr::new::<PyException, _>(e.to_string()))?;
assert_eq!(ErrorCode::Success, event.get_status());
Ok(())
}
-22
View File
@@ -1,22 +0,0 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use bumble::wrapper::drivers::rtk::DriverInfo;
use pyo3::PyResult;
#[pyo3_asyncio::tokio::test]
async fn realtek_driver_info_all_drivers() -> PyResult<()> {
assert_eq!(12, DriverInfo::all_drivers()?.len());
Ok(())
}
-86
View File
@@ -1,86 +0,0 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use bumble::wrapper::{
controller::Controller,
device::Device,
hci::{
packets::{
AddressType, Enable, ErrorCode, LeScanType, LeScanningFilterPolicy,
LeSetScanEnableBuilder, LeSetScanEnableComplete, LeSetScanParametersBuilder,
LeSetScanParametersComplete, OwnAddressType,
},
Address, Error,
},
host::Host,
link::Link,
};
use pyo3::{
exceptions::PyException,
{PyErr, PyResult},
};
#[pyo3_asyncio::tokio::test]
async fn test_hci_roundtrip_success_and_failure() -> PyResult<()> {
let address = Address::new("F0:F1:F2:F3:F4:F5", AddressType::RandomDeviceAddress)?;
let device = create_local_device(address).await?;
device.power_on().await?;
// BLE Spec Core v5.3
// 7.8.9 LE Set Scan Parameters command
// ...
// The Host shall not issue this command when scanning is enabled in the
// Controller; if it is the Command Disallowed error code shall be used.
// ...
let command = LeSetScanEnableBuilder {
filter_duplicates: Enable::Disabled,
// will cause failure later
le_scan_enable: Enable::Enabled,
};
let event: LeSetScanEnableComplete = device
.send_command(command.into(), false)
.await?
.try_into()
.map_err(|e: Error| PyErr::new::<PyException, _>(e.to_string()))?;
assert_eq!(ErrorCode::Success, event.get_status());
let command = LeSetScanParametersBuilder {
le_scan_type: LeScanType::Passive,
le_scan_interval: 0,
le_scan_window: 0,
own_address_type: OwnAddressType::RandomDeviceAddress,
scanning_filter_policy: LeScanningFilterPolicy::AcceptAll,
};
let event: LeSetScanParametersComplete = device
.send_command(command.into(), false)
.await?
.try_into()
.map_err(|e: Error| PyErr::new::<PyException, _>(e.to_string()))?;
assert_eq!(ErrorCode::CommandDisallowed, event.get_status());
Ok(())
}
async fn create_local_device(address: Address) -> PyResult<Device> {
let link = Link::new_local_link()?;
let controller = Controller::new("C1", None, None, Some(link), Some(address.clone())).await?;
let host = Host::new(controller.clone().into(), controller.into()).await?;
Device::new(None, Some(address), None, Some(host), None)
}

Some files were not shown because too many files have changed in this diff Show More