Compare commits

...

12 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod
d7ce62beaa Merge pull request #18 from turon/docs/quick_start
[docs] Add some getting started information to the top-level README.
2022-07-31 12:00:36 -07:00
Gilles Boccon-Gibod
0e2a184edb Merge pull request #17 from mogenson/console_py_do_write
Implement 'write' command for console.py
2022-07-30 16:02:47 -07:00
Martin Turon
e6ee5ae996 [docs] Add references to some of the docs to the top-level for discoverability. 2022-07-30 14:18:08 -07:00
Martin Turon
f1836e659f [docs] Add some getting started information to the top-level README. 2022-07-30 14:13:55 -07:00
Michael Mogenson
99218d3abf Implement 'write' command for console.py
Syntax is `write <attribute> <value>`. Supports a value of type string,
hexadecimal string, or integer.

Ex:
- `write 180D.2A38 hello`
- `write 180D.2A38 0xbeef`
- `write 180D.2A38 123`

Write with response method is used if supported by characteristic,
otherwise write without response.

Add a find_attribute() method to consolidate common logic of finding a
characteristic or attribute handle in `do_read()` and `do_write()`.

Tested with run_gatt_server.py example to verify sent data.
2022-07-29 19:45:24 -04:00
Gilles Boccon-Gibod
b5ba0bef63 Merge pull request #16 from google/jdm/connection-context-manager
Adding in Device.connected_to context manager and Peer.sustain
2022-07-27 17:25:06 -07:00
Jayson Messenger
9cd1890faa Adding in context manager for Connection and Peer classes
* Connection implements async context manager to disconnect when
  context is left
    * The Connection only calls disconnect if the context manager exits
      without an exception
* Peer implements async context manager to discover when entering the
  context
* Device.connect_as_gatt implements an async context manager to nest the
  connection and peer context managers
* Added HCI_StatusError that can be raised when a HCI Command Status
  event is received that doesn't show "PENDING" as status
* Added Connection.sustain to wait for a timeout or disconnect
* Peer.sustain also maps to Connectin.sustain
* Updated battery_client.py to use .connect_as_gatt and .sustain
* Updated heart_rate_client.py to use .connect_as_gatt and .sustain
2022-07-27 14:03:12 -04:00
Gilles Boccon-Gibod
472702a9d9 Merge pull request #12 from google/gbg/more-hci-types
more hci types
2022-07-26 18:00:21 -07:00
Gilles Boccon-Gibod
b38740e5b7 Merge pull request #15 from google/gbg/hr-profile
add support for the heart rate service
2022-07-26 13:17:22 -07:00
Gilles Boccon-Gibod
009ecfce96 use list comprehension 2022-07-19 19:53:18 -07:00
Gilles Boccon-Gibod
d6075df356 add tool 2022-07-19 19:53:18 -07:00
Gilles Boccon-Gibod
ebd0a0c8ca more complete set of HCI types and constants 2022-07-19 19:53:18 -07:00
11 changed files with 1626 additions and 648 deletions

View File

@@ -21,6 +21,29 @@ or see the documentation source under `docs/mkdocs/src`, or build the static HTM
mkdocs build -f docs/mkdocs/mkdocs.yml
```
## Usage
### Getting Started
For a quick start to using Bumble, see the [Getting Started](docs/mkdocs/src/getting_started.md) guide.
### Dependencies
To install package dependencies needed to run the bumble examples execute the following commands:
```
python -m pip install --upgrade pip
python -m pip install ".[test,development,documentation]"
```
### Examples
Refer to the [Example Documentation](examples/README.md) for details on the included example scripts and how to run them.
The complete [list of Examples](/docs/mkdocs/src/examples/index.md), and what they are designed to do is here.
There are also a set of [Apps and Tools](docs/mkdocs/src/apps_and_tools/index.md) that show the utility of Bumble.
## License
Licensed under the [Apache 2.0](LICENSE) License.

View File

@@ -32,6 +32,7 @@ from bumble.core import UUID, AdvertisingData
from bumble.device import Device, Connection, Peer
from bumble.utils import AsyncRunner
from bumble.transport import open_transport_or_link
from bumble.gatt import Characteristic
from prompt_toolkit import Application
from prompt_toolkit.history import FileHistory
@@ -330,9 +331,24 @@ class ConsoleApp:
await self.show_attributes(attributes)
def find_attribute(self, param):
parts = param.split('.')
if len(parts) == 2:
service_uuid = UUID(parts[0]) if parts[0] != '*' else None
characteristic_uuid = UUID(parts[1])
for service in self.connected_peer.services:
if service_uuid is None or service.uuid == service_uuid:
for characteristic in service.characteristics:
if characteristic.uuid == characteristic_uuid:
return characteristic
elif len(parts) == 1:
if parts[0].startswith('#'):
attribute_handle = int(f'{parts[0][1:]}', 16)
return attribute_handle
async def command(self, command):
try:
(keyword, *params) = command.strip().split(' ', 1)
(keyword, *params) = command.strip().split(' ')
keyword = keyword.replace('-', '_').lower()
handler = getattr(self, f'do_{keyword}', None)
if handler:
@@ -441,26 +457,46 @@ class ConsoleApp:
self.show_error('invalid syntax', 'expected read <attribute>')
return
parts = params[0].split('.')
if len(parts) == 2:
service_uuid = UUID(parts[0]) if parts[0] != '*' else None
characteristic_uuid = UUID(parts[1])
for service in self.connected_peer.services:
if service_uuid is None or service.uuid == service_uuid:
for characteristic in service.characteristics:
if characteristic.uuid == characteristic_uuid:
value = await self.connected_peer.read_value(characteristic)
self.append_to_output(f'VALUE: {value}')
return
attribute = self.find_attribute(params[0])
if attribute is None:
self.show_error('no such characteristic')
elif len(parts) == 1:
if parts[0].startswith('#'):
attribute_handle = int(f'{parts[0][1:]}', 16)
value = await self.connected_peer.read_value(attribute_handle)
self.append_to_output(f'VALUE: {value}')
return
return
value = await self.connected_peer.read_value(attribute)
self.append_to_output(f'VALUE: {value}')
async def do_write(self, params):
if not self.connected_peer:
self.show_error('not connected')
return
if len(params) != 2:
self.show_error('invalid syntax', 'expected write <attribute> <value>')
return
if params[1].upper().startswith("0X"):
value = bytes.fromhex(params[1][2:]) # parse as hex string
else:
try:
value = int(params[1]) # try as integer
except ValueError:
value = str.encode(params[1]) # must be a string
attribute = self.find_attribute(params[0])
if attribute is None:
self.show_error('no such characteristic')
return
# use write with response if supported
with_response = (
(attribute.properties & Characteristic.WRITE)
if hasattr(attribute, "properties")
else False
)
await self.connected_peer.write_value(
attribute, value, with_response=with_response
)
async def do_exit(self, params):
self.ui.exit()

105
apps/controller_info.py Normal file
View File

@@ -0,0 +1,105 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import os
import logging
import click
from colors import color
from bumble.company_ids import COMPANY_IDENTIFIERS
from bumble.core import name_or_number
from bumble.hci import (
map_null_terminated_utf8_string,
HCI_LE_SUPPORTED_FEATURES_NAMES,
HCI_SUCCESS,
HCI_VERSION_NAMES,
LMP_VERSION_NAMES,
HCI_Command,
HCI_Read_BD_ADDR_Command,
HCI_READ_BD_ADDR_COMMAND,
HCI_Read_Local_Name_Command,
HCI_READ_LOCAL_NAME_COMMAND
)
from bumble.host import Host
from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def get_classic_info(host):
if host.supports_command(HCI_READ_BD_ADDR_COMMAND):
response = await host.send_command(HCI_Read_BD_ADDR_Command())
if response.return_parameters.status == HCI_SUCCESS:
print()
print(color('Classic Address:', 'yellow'), response.return_parameters.bd_addr)
if host.supports_command(HCI_READ_LOCAL_NAME_COMMAND):
response = await host.send_command(HCI_Read_Local_Name_Command())
if response.return_parameters.status == HCI_SUCCESS:
print()
print(color('Local Name:', 'yellow'), map_null_terminated_utf8_string(response.return_parameters.local_name))
# -----------------------------------------------------------------------------
async def get_le_info(host):
print()
print(color('LE Features:', 'yellow'))
for feature in host.supported_le_features:
print(' ', name_or_number(HCI_LE_SUPPORTED_FEATURES_NAMES, feature))
# -----------------------------------------------------------------------------
async def async_main(transport):
print('<<< connecting to HCI...')
async with await open_transport_or_link(transport) as (hci_source, hci_sink):
print('<<< connected')
host = Host(hci_source, hci_sink)
await host.reset()
# Print version
print(color('Version:', 'yellow'))
print(color(' Manufacturer: ', 'green'), name_or_number(COMPANY_IDENTIFIERS, host.local_version.company_identifier))
print(color(' HCI Version: ', 'green'), name_or_number(HCI_VERSION_NAMES, host.local_version.hci_version))
print(color(' HCI Subversion:', 'green'), host.local_version.hci_subversion)
print(color(' LMP Version: ', 'green'), name_or_number(LMP_VERSION_NAMES, host.local_version.lmp_version))
print(color(' LMP Subversion:', 'green'), host.local_version.lmp_subversion)
# Get the Classic info
await get_classic_info(host)
# Get the LE info
await get_le_info(host)
# Print the list of commands supported by the controller
print()
print(color('Supported Commands:', 'yellow'))
for command in host.supported_commands:
print(' ', HCI_Command.command_name(command))
# -----------------------------------------------------------------------------
@click.command()
@click.argument('transport')
def main(transport):
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
asyncio.run(async_main(transport))
# -----------------------------------------------------------------------------
if __name__ == '__main__':
main()

View File

@@ -77,7 +77,7 @@ class Controller:
self.le_features = bytes.fromhex('ff49010000000000')
self.le_states = bytes.fromhex('ffff3fffff030000')
self.avertising_channel_tx_power = 0
self.white_list_size = 8
self.filter_accept_list_size = 8
self.resolving_list_size = 8
self.supported_max_tx_octets = 27
self.supported_max_tx_time = 10000 # microseconds
@@ -731,27 +731,27 @@ class Controller:
'''
return bytes([HCI_SUCCESS])
def on_hci_le_read_white_list_size_command(self, command):
def on_hci_le_read_filter_accept_list_size_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.14 LE Read White List Size Command
See Bluetooth spec Vol 2, Part E - 7.8.14 LE Read Filter Accept List Size Command
'''
return bytes([HCI_SUCCESS, self.white_list_size])
return bytes([HCI_SUCCESS, self.filter_accept_list_size])
def on_hci_le_clear_white_list_command(self, command):
def on_hci_le_clear_filter_accept_list_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.15 LE Clear White List Command
See Bluetooth spec Vol 2, Part E - 7.8.15 LE Clear Filter Accept List Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_add_device_to_white_list_command(self, command):
def on_hci_le_add_device_to_filter_accept_list_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.16 LE Add Device To White List Command
See Bluetooth spec Vol 2, Part E - 7.8.16 LE Add Device To Filter Accept List Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_remove_device_from_white_list_command(self, command):
def on_hci_le_remove_device_from_filter_accept_list_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.17 LE Remove Device From White List Command
See Bluetooth spec Vol 2, Part E - 7.8.17 LE Remove Device From Filter Accept List Command
'''
return bytes([HCI_SUCCESS])
@@ -780,9 +780,9 @@ class Controller:
'''
return bytes([HCI_SUCCESS]) + struct.pack('Q', random.randint(0, 1 << 64))
def on_hci_le_start_encryption_command(self, command):
def on_hci_le_enable_encryption_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.24 LE Start Encryption Command
See Bluetooth spec Vol 2, Part E - 7.8.24 LE Enable Encryption Command
'''
# Check the parameters

View File

@@ -18,6 +18,7 @@
import json
import asyncio
import logging
from contextlib import asynccontextmanager, AsyncExitStack
from .hci import *
from .host import Host
@@ -148,10 +149,24 @@ class Peer:
await service.discover_characteristics()
return self.create_service_proxy(proxy_class)
async def sustain(self, timeout=None):
await self.connection.sustain(timeout)
# [Classic only]
async def request_name(self):
return await self.connection.request_remote_name()
async def __aenter__(self):
await self.discover_services()
for service in self.services:
await self.discover_characteristics()
return self
async def __aexit__(self, exc_type, exc_value, traceback):
pass
def __str__(self):
return f'{self.connection.peer_address} as {self.connection.role_name}'
@@ -232,6 +247,21 @@ class Connection(CompositeEventEmitter):
async def encrypt(self):
return await self.device.encrypt(self)
async def sustain(self, timeout=None):
""" Idles the current task waiting for a disconnect or timeout """
abort = asyncio.get_running_loop().create_future()
self.on('disconnection', abort.set_result)
self.on('disconnection_failure', abort.set_exception)
try:
await asyncio.wait_for(abort, timeout)
except asyncio.TimeoutError:
pass
self.remove_listener('disconnection', abort.set_result)
self.remove_listener('disconnection_failure', abort.set_exception)
async def update_parameters(
self,
conn_interval_min,
@@ -251,6 +281,18 @@ class Connection(CompositeEventEmitter):
async def request_remote_name(self):
return await self.device.request_remote_name(self)
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_value, traceback):
if exc_type is None:
try:
await self.disconnect()
except HCI_StatusError as e:
# Invalid parameter means the connection is no longer valid
if e.error_code != HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR:
raise
def __str__(self):
return f'Connection(handle=0x{self.handle:04X}, role={self.role_name}, address={self.peer_address})'
@@ -314,6 +356,7 @@ class DeviceConfiguration:
# within a class requires unnecessarily complicated acrobatics)
# -----------------------------------------------------------------------------
# Decorator that converts the first argument from a connection handle to a connection
def with_connection_from_handle(function):
@functools.wraps(function)
@@ -704,7 +747,7 @@ class Device(CompositeEventEmitter):
))
if response.status != HCI_Command_Status_Event.PENDING:
self.discovering = False
raise RuntimeError(f'HCI_Inquiry command failed: {HCI_Constant.status_name(response.status)} ({response.status})')
raise HCI_StatusError(response)
self.discovering = True
@@ -785,7 +828,7 @@ class Device(CompositeEventEmitter):
try:
peer_address = Address(peer_address)
except ValueError:
# If the address is not parssable, assume it is a name instead
# If the address is not parsable, assume it is a name instead
logger.debug('looking for peer by name')
peer_address = await self.find_peer_by_name(peer_address, transport)
@@ -824,16 +867,25 @@ class Device(CompositeEventEmitter):
try:
if result.status != HCI_Command_Status_Event.PENDING:
raise RuntimeError(f'HCI_LE_Create_Connection_Command failed: {HCI_Constant.status_name(result.status)} ({result.status})')
raise HCI_StatusError(result)
# Wait for the connection process to complete
self.connecting = True
return await pending_connection
finally:
self.remove_listener('connection', pending_connection.set_result)
self.remove_listener('connection_failure', pending_connection.set_exception)
self.connecting = False
@asynccontextmanager
async def connect_as_gatt(self, peer_address):
async with AsyncExitStack() as stack:
connection = await stack.enter_async_context(await self.connect(peer_address))
peer = await stack.enter_async_context(Peer(connection))
yield peer
@property
def is_connecting(self):
return self.connecting
@@ -858,7 +910,7 @@ class Device(CompositeEventEmitter):
try:
if result.status != HCI_Command_Status_Event.PENDING:
raise RuntimeError(f'HCI_Disconnect_Command failed: {HCI_Constant.status_name(result.status)} ({result.status})')
raise HCI_StatusError(result)
# Wait for the disconnection process to complete
self.disconnecting = True
@@ -1010,7 +1062,7 @@ class Device(CompositeEventEmitter):
)
if result.status != HCI_COMMAND_STATUS_PENDING:
logger.warn(f'HCI_Authentication_Requested_Command failed: {HCI_Constant.error_name(result.status)}')
raise HCI_Error(result.status)
raise HCI_StatusError(result)
# Wait for the authentication to complete
await pending_authentication
@@ -1057,7 +1109,7 @@ class Device(CompositeEventEmitter):
raise InvalidStateError('only centrals can start encryption')
result = await self.send_command(
HCI_LE_Start_Encryption_Command(
HCI_LE_Enable_Encryption_Command(
connection_handle = connection.handle,
random_number = rand,
encrypted_diversifier = ediv,
@@ -1066,8 +1118,8 @@ class Device(CompositeEventEmitter):
)
if result.status != HCI_COMMAND_STATUS_PENDING:
logger.warn(f'HCI_LE_Start_Encryption_Command failed: {HCI_Constant.error_name(result.status)}')
raise HCI_Error(result.status)
logger.warn(f'HCI_LE_Enable_Encryption_Command failed: {HCI_Constant.error_name(result.status)}')
raise HCI_StatusError(result)
else:
result = await self.send_command(
HCI_Set_Connection_Encryption_Command(
@@ -1078,7 +1130,7 @@ class Device(CompositeEventEmitter):
if result.status != HCI_COMMAND_STATUS_PENDING:
logger.warn(f'HCI_Set_Connection_Encryption_Command failed: {HCI_Constant.error_name(result.status)}')
raise HCI_Error(result.status)
raise HCI_StatusError(result)
# Wait for the result
await pending_encryption
@@ -1112,7 +1164,7 @@ class Device(CompositeEventEmitter):
if result.status != HCI_COMMAND_STATUS_PENDING:
logger.warn(f'HCI_Set_Connection_Encryption_Command failed: {HCI_Constant.error_name(result.status)}')
raise HCI_Error(result.status)
raise HCI_StatusError(result)
# Wait for the result
return await pending_name

File diff suppressed because it is too large Load Diff

View File

@@ -81,7 +81,9 @@ class Host(EventEmitter):
self.hc_total_num_acl_data_packets = HOST_HC_TOTAL_NUM_ACL_DATA_PACKETS
self.acl_packet_queue = collections.deque()
self.acl_packets_in_flight = 0
self.local_version = None
self.local_supported_commands = bytes(64)
self.local_le_features = 0
self.command_semaphore = asyncio.Semaphore(1)
self.long_term_key_provider = None
self.link_key_provider = None
@@ -97,34 +99,51 @@ class Host(EventEmitter):
await self.send_command(HCI_Reset_Command())
self.ready = True
response = await self.send_command(HCI_Read_Local_Supported_Commands_Command())
if response.return_parameters.status != HCI_SUCCESS:
raise ProtocolError(response.return_parameters.status, 'hci')
self.local_supported_commands = response.return_parameters.supported_commands
await self.send_command(HCI_Set_Event_Mask_Command(event_mask = bytes.fromhex('FFFFFFFFFFFFFFFF')))
await self.send_command(HCI_LE_Set_Event_Mask_Command(le_event_mask = bytes.fromhex('FFFFF00000000000')))
await self.send_command(HCI_Read_Local_Version_Information_Command())
await self.send_command(HCI_Write_LE_Host_Support_Command(le_supported_host = 1, simultaneous_le_host = 0))
response = await self.send_command(HCI_LE_Read_Buffer_Size_Command())
response = await self.send_command(HCI_Read_Local_Supported_Commands_Command())
if response.return_parameters.status == HCI_SUCCESS:
self.hc_le_acl_data_packet_length = response.return_parameters.hc_le_acl_data_packet_length
self.hc_total_num_le_acl_data_packets = response.return_parameters.hc_total_num_le_acl_data_packets
logger.debug(f'HCI LE ACL flow control: hc_le_acl_data_packet_length={response.return_parameters.hc_le_acl_data_packet_length}, hc_total_num_le_acl_data_packets={response.return_parameters.hc_total_num_le_acl_data_packets}')
self.local_supported_commands = response.return_parameters.supported_commands
else:
logger.warn(f'HCI_LE_Read_Buffer_Size_Command failed: {response.return_parameters.status}')
if response.return_parameters.hc_le_acl_data_packet_length == 0 or response.return_parameters.hc_total_num_le_acl_data_packets == 0:
# Read the non-LE-specific values
response = await self.send_command(HCI_Read_Buffer_Size_Command())
logger.warn(f'HCI_Read_Local_Supported_Commands_Command failed: {response.return_parameters.status}')
if self.supports_command(HCI_WRITE_LE_HOST_SUPPORT_COMMAND):
await self.send_command(HCI_Write_LE_Host_Support_Command(le_supported_host = 1, simultaneous_le_host = 0))
if self.supports_command(HCI_READ_LOCAL_VERSION_INFORMATION_COMMAND):
response = await self.send_command(HCI_Read_Local_Version_Information_Command())
if response.return_parameters.status == HCI_SUCCESS:
self.hc_acl_data_packet_length = response.return_parameters.hc_le_acl_data_packet_length
self.hc_le_acl_data_packet_length = self.hc_le_acl_data_packet_length or self.hc_acl_data_packet_length
self.hc_total_num_acl_data_packets = response.return_parameters.hc_total_num_le_acl_data_packets
self.hc_total_num_le_acl_data_packets = self.hc_total_num_le_acl_data_packets or self.hc_total_num_acl_data_packets
logger.debug(f'HCI LE ACL flow control: hc_le_acl_data_packet_length={self.hc_le_acl_data_packet_length}, hc_total_num_le_acl_data_packets={self.hc_total_num_le_acl_data_packets}')
self.local_version = response.return_parameters
else:
logger.warn(f'HCI_Read_Buffer_Size_Command failed: {response.return_parameters.status}')
logger.warn(f'HCI_Read_Local_Version_Information_Command failed: {response.return_parameters.status}')
if self.supports_command(HCI_LE_READ_BUFFER_SIZE_COMMAND):
response = await self.send_command(HCI_LE_Read_Buffer_Size_Command())
if response.return_parameters.status == HCI_SUCCESS:
self.hc_le_acl_data_packet_length = response.return_parameters.hc_le_acl_data_packet_length
self.hc_total_num_le_acl_data_packets = response.return_parameters.hc_total_num_le_acl_data_packets
logger.debug(f'HCI LE ACL flow control: hc_le_acl_data_packet_length={response.return_parameters.hc_le_acl_data_packet_length}, hc_total_num_le_acl_data_packets={response.return_parameters.hc_total_num_le_acl_data_packets}')
else:
logger.warn(f'HCI_LE_Read_Buffer_Size_Command failed: {response.return_parameters.status}')
if response.return_parameters.hc_le_acl_data_packet_length == 0 or response.return_parameters.hc_total_num_le_acl_data_packets == 0:
# Read the non-LE-specific values
response = await self.send_command(HCI_Read_Buffer_Size_Command())
if response.return_parameters.status == HCI_SUCCESS:
self.hc_acl_data_packet_length = response.return_parameters.hc_le_acl_data_packet_length
self.hc_le_acl_data_packet_length = self.hc_le_acl_data_packet_length or self.hc_acl_data_packet_length
self.hc_total_num_acl_data_packets = response.return_parameters.hc_total_num_le_acl_data_packets
self.hc_total_num_le_acl_data_packets = self.hc_total_num_le_acl_data_packets or self.hc_total_num_acl_data_packets
logger.debug(f'HCI LE ACL flow control: hc_le_acl_data_packet_length={self.hc_le_acl_data_packet_length}, hc_total_num_le_acl_data_packets={self.hc_total_num_le_acl_data_packets}')
else:
logger.warn(f'HCI_Read_Buffer_Size_Command failed: {response.return_parameters.status}')
if self.supports_command(HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND):
response = await self.send_command(HCI_LE_Read_Local_Supported_Features_Command())
if response.return_parameters.status == HCI_SUCCESS:
self.local_le_features = struct.unpack('<Q', response.return_parameters.le_features)[0]
else:
logger.warn(f'HCI_LE_Read_Supported_Features_Command failed: {response.return_parameters.status}')
self.reset_done = True
@@ -211,6 +230,37 @@ class Host(EventEmitter):
self.send_hci_packet(packet)
self.acl_packets_in_flight += 1
def supports_command(self, command):
# Find the support flag position for this command
for (octet, flags) in enumerate(HCI_SUPPORTED_COMMANDS_FLAGS):
for (flag_position, value) in enumerate(flags):
if value == command:
# Check if the flag is set
if octet < len(self.local_supported_commands) and flag_position < 8:
return (self.local_supported_commands[octet] & (1 << flag_position)) != 0
return False
@property
def supported_commands(self):
commands = []
for (octet, flags) in enumerate(self.local_supported_commands):
if octet < len(HCI_SUPPORTED_COMMANDS_FLAGS):
for flag in range(8):
if flags & (1 << flag) != 0:
command = HCI_SUPPORTED_COMMANDS_FLAGS[octet][flag]
if command is not None:
commands.append(command)
return commands
def supports_le_feature(self, feature):
return (self.local_le_features & (1 << feature)) != 0
@property
def supported_le_features(self):
return [feature for feature in range(64) if self.local_le_features & (1 << feature)]
# Packet Sink protocol (packets coming from the controller via HCI)
def on_packet(self, packet):
hci_packet = HCI_Packet.from_bytes(packet)

View File

@@ -868,7 +868,7 @@ class Session:
# distribute the long term and/or other keys over an encrypted connection
asyncio.create_task(
self.manager.device.host.send_command(
HCI_LE_Start_Encryption_Command(
HCI_LE_Enable_Encryption_Command(
connection_handle = self.connection.handle,
random_number = bytes(8),
encrypted_diversifier = 0,

View File

@@ -43,28 +43,24 @@ async def main():
# Connect to the peer
target_address = sys.argv[2]
print(f'=== Connecting to {target_address}...')
connection = await device.connect(target_address)
print(f'=== Connected to {connection}')
async with device.connect_as_gatt(target_address) as peer:
print(f'=== Connected to {peer}')
battery_service = peer.create_service_proxy(BatteryServiceProxy)
# Discover the Battery Service
peer = Peer(connection)
print('=== Discovering Battery Service')
battery_service = await peer.discover_service_and_create_proxy(BatteryServiceProxy)
# Check that the service was found
if not battery_service:
print('!!! Service not found')
return
# Check that the service was found
if not battery_service:
print('!!! Service not found')
return
# Subscribe to and read the battery level
if battery_service.battery_level:
await battery_service.battery_level.subscribe(
lambda value: print(f'{color("Battery Level Update:", "green")} {value}')
)
value = await battery_service.battery_level.read_value()
print(f'{color("Initial Battery Level:", "green")} {value}')
# Subscribe to and read the battery level
if battery_service.battery_level:
await battery_service.battery_level.subscribe(
lambda value: print(f'{color("Battery Level Update:", "green")} {value}')
)
value = await battery_service.battery_level.read_value()
print(f'{color("Initial Battery Level:", "green")} {value}')
await hci_source.wait_for_termination()
await peer.sustain()
# -----------------------------------------------------------------------------

View File

@@ -43,31 +43,28 @@ async def main():
# Connect to the peer
target_address = sys.argv[2]
print(f'=== Connecting to {target_address}...')
connection = await device.connect(target_address)
print(f'=== Connected to {connection}')
async with device.connect_as_gatt(target_address) as peer:
print(f'=== Connected to {peer}')
# Discover the Heart Rate Service
peer = Peer(connection)
print('=== Discovering Heart Rate Service')
heart_rate_service = await peer.discover_service_and_create_proxy(HeartRateServiceProxy)
heart_rate_service = peer.create_service_proxy(HeartRateServiceProxy)
# Check that the service was found
if not heart_rate_service:
print('!!! Service not found')
return
# Check that the service was found
if not heart_rate_service:
print('!!! Service not found')
return
# Read the body sensor location
if heart_rate_service.body_sensor_location:
location = await heart_rate_service.body_sensor_location.read_value()
print(color('Sensor Location:', 'green'), location)
# Read the body sensor location
if heart_rate_service.body_sensor_location:
location = await heart_rate_service.body_sensor_location.read_value()
print(color('Sensor Location:', 'green'), location)
# Subscribe to the heart rate measurement
if heart_rate_service.heart_rate_measurement:
await heart_rate_service.heart_rate_measurement.subscribe(
lambda value: print(f'{color("Heart Rate Measurement:", "green")} {value}')
)
# Subscribe to the heart rate measurement
if heart_rate_service.heart_rate_measurement:
await heart_rate_service.heart_rate_measurement.subscribe(
lambda value: print(f'{color("Heart Rate Measurement:", "green")} {value}')
)
await hci_source.wait_for_termination()
await peer.sustain()
# -----------------------------------------------------------------------------

View File

@@ -294,8 +294,8 @@ def test_HCI_LE_Create_Connection_Command():
# -----------------------------------------------------------------------------
def test_HCI_LE_Add_Device_To_White_List_Command():
command = HCI_LE_Add_Device_To_White_List_Command(
def test_HCI_LE_Add_Device_To_Filter_Accept_List_Command():
command = HCI_LE_Add_Device_To_Filter_Accept_List_Command(
address_type = 1,
address = Address('00:11:22:33:44:55')
)
@@ -303,8 +303,8 @@ def test_HCI_LE_Add_Device_To_White_List_Command():
# -----------------------------------------------------------------------------
def test_HCI_LE_Remove_Device_From_White_List_Command():
command = HCI_LE_Remove_Device_From_White_List_Command(
def test_HCI_LE_Remove_Device_From_Filter_Accept_List_Command():
command = HCI_LE_Remove_Device_From_Filter_Accept_List_Command(
address_type = 1,
address = Address('00:11:22:33:44:55')
)
@@ -343,6 +343,23 @@ def test_HCI_LE_Set_Default_PHY_Command():
basic_check(command)
# -----------------------------------------------------------------------------
def test_HCI_LE_Set_Extended_Scan_Parameters_Command():
command = HCI_LE_Set_Extended_Scan_Parameters_Command(
own_address_type=Address.RANDOM_DEVICE_ADDRESS,
scanning_filter_policy=HCI_LE_Set_Extended_Scan_Parameters_Command.BASIC_FILTERED_POLICY,
scanning_phys=(1 << HCI_LE_Set_Extended_Scan_Parameters_Command.LE_1M_PHY | 1 << HCI_LE_Set_Extended_Scan_Parameters_Command.LE_CODED_PHY | 1 << 4),
scan_types=[
HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING,
HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING,
HCI_LE_Set_Extended_Scan_Parameters_Command.PASSIVE_SCANNING
],
scan_intervals=[1, 2, 3],
scan_windows=[4, 5, 6]
)
basic_check(command)
# -----------------------------------------------------------------------------
def test_address():
a = Address('C4:F2:17:1A:1D:BB')
@@ -391,11 +408,12 @@ def run_test_commands():
test_HCI_LE_Set_Scan_Parameters_Command()
test_HCI_LE_Set_Scan_Enable_Command()
test_HCI_LE_Create_Connection_Command()
test_HCI_LE_Add_Device_To_White_List_Command()
test_HCI_LE_Remove_Device_From_White_List_Command()
test_HCI_LE_Add_Device_To_Filter_Accept_List_Command()
test_HCI_LE_Remove_Device_From_Filter_Accept_List_Command()
test_HCI_LE_Connection_Update_Command()
test_HCI_LE_Read_Remote_Features_Command()
test_HCI_LE_Set_Default_PHY_Command()
test_HCI_LE_Set_Extended_Scan_Parameters_Command()
# -----------------------------------------------------------------------------