add AdvertisingType

This commit is contained in:
Gilles Boccon-Gibod
2022-10-15 21:43:08 -07:00
parent d10dda7e10
commit d5eebc2101
6 changed files with 192 additions and 54 deletions

View File

@@ -76,7 +76,7 @@ class Controller:
self.supported_commands = bytes.fromhex('2000800000c000000000e40000002822000000000000040000f7ffff7f00000030f0f9ff01008004000000000000000000000000000000000000000000000000')
self.le_features = bytes.fromhex('ff49010000000000')
self.le_states = bytes.fromhex('ffff3fffff030000')
self.avertising_channel_tx_power = 0
self.advertising_channel_tx_power = 0
self.filter_accept_list_size = 8
self.resolving_list_size = 8
self.supported_max_tx_octets = 27
@@ -650,7 +650,7 @@ class Controller:
'''
See Bluetooth spec Vol 2, Part E - 7.8.6 LE Read Advertising Channel Tx Power Command
'''
return bytes([HCI_SUCCESS, self.avertising_channel_tx_power])
return bytes([HCI_SUCCESS, self.advertising_channel_tx_power])
def on_hci_le_set_advertising_data_command(self, command):
'''

View File

@@ -15,6 +15,7 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from enum import IntEnum
import json
import asyncio
import logging
@@ -66,6 +67,9 @@ DEVICE_MIN_SCAN_INTERVAL = 25
DEVICE_MAX_SCAN_INTERVAL = 10240
DEVICE_MIN_SCAN_WINDOW = 25
DEVICE_MAX_SCAN_WINDOW = 10240
DEVICE_MIN_LE_RSSI = -127
DEVICE_MAX_LE_RSSI = 20
# -----------------------------------------------------------------------------
# Classes
@@ -199,6 +203,45 @@ class AdvertisementDataAccumulator:
return result
# -----------------------------------------------------------------------------
class AdvertisingType(IntEnum):
UNDIRECTED_CONNECTABLE_SCANNABLE = 0x00 # Undirected, connectable, scannable
DIRECTED_CONNECTABLE_HIGH_DUTY = 0x01 # Directed, connectable, non-scannable
UNDIRECTED_SCANNABLE = 0x02 # Undirected, non-connectable, scannable
UNDIRECTED = 0x03 # Undirected, non-connectable, non-scannable
DIRECTED_CONNECTABLE_LOW_DUTY = 0x04 # Directed, connectable, non-scannable
@property
def has_data(self):
return self in {
AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
AdvertisingType.UNDIRECTED_SCANNABLE,
AdvertisingType.UNDIRECTED
}
@property
def is_connectable(self):
return self in {
AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
AdvertisingType.DIRECTED_CONNECTABLE_HIGH_DUTY,
AdvertisingType.DIRECTED_CONNECTABLE_LOW_DUTY
}
@property
def is_scannable(self):
return self in {
AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
AdvertisingType.UNDIRECTED_SCANNABLE
}
@property
def is_directed(self):
return self in {
AdvertisingType.DIRECTED_CONNECTABLE_HIGH_DUTY,
AdvertisingType.DIRECTED_CONNECTABLE_LOW_DUTY
}
# -----------------------------------------------------------------------------
class Peer:
def __init__(self, connection):
@@ -298,7 +341,8 @@ class ConnectionParametersPreferences:
min_ce_length: int = DEVICE_DEFAULT_CONNECTION_MIN_CE_LENGTH
max_ce_length: int = DEVICE_DEFAULT_CONNECTION_MAX_CE_LENGTH
DEVICE_DEFAULT_CONNECTION_PARAMETER_PREFERENCES = ConnectionParametersPreferences()
ConnectionParametersPreferences.default = ConnectionParametersPreferences()
# -----------------------------------------------------------------------------
@@ -592,6 +636,7 @@ class Device(CompositeEventEmitter):
self._host = None
self.powered_on = False
self.advertising = False
self.advertising_type = None
self.auto_restart_advertising = False
self.command_timeout = 10 # seconds
self.gatt_server = gatt_server.Server(self)
@@ -811,32 +856,48 @@ class Device(CompositeEventEmitter):
return self.host.supports_le_feature(feature_map[phy])
async def start_advertising(self, auto_restart=False):
self.auto_restart_advertising = auto_restart
async def start_advertising(
self,
advertising_type=AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
target=None,
auto_restart=False
):
# If we're advertising, stop first
if self.advertising:
await self.stop_advertising()
# Set/update the advertising data
await self.send_command(HCI_LE_Set_Advertising_Data_Command(
advertising_data = self.advertising_data
), check_result=True)
# Set/update the advertising data if the advertising type allows it
if advertising_type.has_data:
await self.send_command(HCI_LE_Set_Advertising_Data_Command(
advertising_data = self.advertising_data
), check_result=True)
# Set/update the scan response data
await self.send_command(HCI_LE_Set_Scan_Response_Data_Command(
scan_response_data = self.scan_response_data
), check_result=True)
# Set/update the scan response data if the advertising is scannable
if advertising_type.is_scannable:
await self.send_command(HCI_LE_Set_Scan_Response_Data_Command(
scan_response_data = self.scan_response_data
), check_result=True)
# Decide what peer address to use
if advertising_type.is_directed:
if target is None:
raise ValueError('directed advertising requires a target address')
peer_address = target
peer_address_type = target.address_type
else:
peer_address = Address('00:00:00:00:00:00')
peer_address_type = Address.PUBLIC_DEVICE_ADDRESS
# Set the advertising parameters
await self.send_command(HCI_LE_Set_Advertising_Parameters_Command(
# TODO: use real values, not fixed ones
advertising_interval_min = self.advertising_interval_min,
advertising_interval_max = self.advertising_interval_max,
advertising_type = HCI_LE_Set_Advertising_Parameters_Command.ADV_IND,
advertising_type = int(advertising_type),
own_address_type = Address.RANDOM_DEVICE_ADDRESS, # TODO: allow using the public address
peer_address_type = Address.PUBLIC_DEVICE_ADDRESS,
peer_address = Address('00:00:00:00:00:00'),
peer_address_type = peer_address_type,
peer_address = peer_address,
advertising_channel_map = 7,
advertising_filter_policy = 0
), check_result=True)
@@ -846,7 +907,9 @@ class Device(CompositeEventEmitter):
advertising_enable = 1
), check_result=True)
self.advertising = True
self.auto_restart_advertising = auto_restart
self.advertising_type = advertising_type
self.advertising = True
async def stop_advertising(self):
# Disable advertising
@@ -855,7 +918,9 @@ class Device(CompositeEventEmitter):
advertising_enable = 0
), check_result=True)
self.advertising = False
self.advertising = False
self.advertising_type = None
self.auto_restart_advertising = False
@property
def is_advertising(self):
@@ -1088,9 +1153,9 @@ class Device(CompositeEventEmitter):
if connection_parameters_preferences is None:
if connection_parameters_preferences is None:
connection_parameters_preferences = {
HCI_LE_1M_PHY: DEVICE_DEFAULT_CONNECTION_PARAMETER_PREFERENCES,
HCI_LE_2M_PHY: DEVICE_DEFAULT_CONNECTION_PARAMETER_PREFERENCES,
HCI_LE_CODED_PHY: DEVICE_DEFAULT_CONNECTION_PARAMETER_PREFERENCES
HCI_LE_1M_PHY: ConnectionParametersPreferences.default,
HCI_LE_2M_PHY: ConnectionParametersPreferences.default,
HCI_LE_CODED_PHY: ConnectionParametersPreferences.default
}
if self.host.supports_command(HCI_LE_EXTENDED_CREATE_CONNECTION_COMMAND):
@@ -1610,7 +1675,13 @@ class Device(CompositeEventEmitter):
@host_event_handler
def on_connection_failure(self, connection_handle, error_code):
logger.debug(f'*** Connection failed: {error_code}')
logger.debug(f'*** Connection failed: {HCI_Constant.error_name(error_code)}')
# For directed advertising, this means a timeout
if self.advertising and self.advertising_type.is_directed:
self.advertising = False
# Notify listeners
error = ConnectionError(
error_code,
'hci',
@@ -1633,7 +1704,10 @@ class Device(CompositeEventEmitter):
# Restart advertising if auto-restart is enabled
if self.auto_restart_advertising:
logger.debug('restarting advertising')
asyncio.create_task(self.start_advertising(auto_restart=self.auto_restart_advertising))
asyncio.create_task(self.start_advertising(
advertising_type = self.advertising_type,
auto_restart = True
))
@host_event_handler
@with_connection_from_handle

View File

@@ -1491,7 +1491,7 @@ class HCI_Object:
def dict_to_bytes(object, fields):
result = bytearray()
for (field_name, field_type) in fields:
# The field_type may be a dictionnary with a mapper, parser, serializer, and/or size
# The field_type may be a dictionary with a mapper, parser, serializer, and/or size
serializer = None
if type(field_type) is dict:
if 'serializer' in field_type:
@@ -2826,18 +2826,18 @@ class HCI_LE_Set_Advertising_Parameters_Command(HCI_Command):
See Bluetooth spec @ 7.8.5 LE Set Advertising Parameters Command
'''
ADV_IND = 0x00
ADV_DIRECT_IND = 0x01
ADV_SCAN_IND = 0x02
ADV_NONCONN_IND = 0x03
ADV_DIRECT_IND = 0x04
ADV_IND = 0x00
ADV_DIRECT_IND = 0x01
ADV_SCAN_IND = 0x02
ADV_NONCONN_IND = 0x03
ADV_DIRECT_IND_LOW_DUTY = 0x04
ADVERTISING_TYPE_NAMES = {
ADV_IND: 'ADV_IND',
ADV_DIRECT_IND: 'ADV_DIRECT_IND',
ADV_SCAN_IND: 'ADV_SCAN_IND',
ADV_NONCONN_IND: 'ADV_NONCONN_IND',
ADV_DIRECT_IND: 'ADV_DIRECT_IND'
ADV_IND: 'ADV_IND',
ADV_DIRECT_IND: 'ADV_DIRECT_IND',
ADV_SCAN_IND: 'ADV_SCAN_IND',
ADV_NONCONN_IND: 'ADV_NONCONN_IND',
ADV_DIRECT_IND_LOW_DUTY: 'ADV_DIRECT_IND_LOW_DUTY'
}
@classmethod
@@ -4346,7 +4346,7 @@ class HCI_Encryption_Change_Event(HCI_Event):
E0_OR_AES_CCM = 0x01
AES_CCM = 0x02
ENCYRPTION_ENABLED_NAMES = {
ENCRYPTION_ENABLED_NAMES = {
OFF: 'OFF',
E0_OR_AES_CCM: 'E0_OR_AES_CCM',
AES_CCM: 'AES_CCM'
@@ -4354,7 +4354,7 @@ class HCI_Encryption_Change_Event(HCI_Event):
@staticmethod
def encryption_enabled_name(encryption_enabled):
return name_or_number(HCI_Encryption_Change_Event.ENCYRPTION_ENABLED_NAMES, encryption_enabled)
return name_or_number(HCI_Encryption_Change_Event.ENCRYPTION_ENABLED_NAMES, encryption_enabled)
# -----------------------------------------------------------------------------
@@ -4701,7 +4701,7 @@ class HCI_Synchronous_Connection_Complete_Event(HCI_Event):
U_LAW_LOG_AIR_MODE: 'u-law log',
A_LAW_LOG_AIR_MORE: 'A-law log',
CVSD_AIR_MODE: 'CVSD',
TRANSPARENT_DATA_AIR_MODE: 'Transparend Data'
TRANSPARENT_DATA_AIR_MODE: 'Transparent Data'
}
@staticmethod

View File

@@ -29,18 +29,31 @@ from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def main():
if len(sys.argv) != 3:
print('Usage: run_advertiser.py <config-file> <transport-spec>')
print('example: run_advertiser.py device1.json link-relay:ws://localhost:8888/test')
if len(sys.argv) < 3:
print('Usage: run_advertiser.py <config-file> <transport-spec> [type] [address]')
print('example: run_advertiser.py device1.json usb:0')
return
if len(sys.argv) >= 4:
advertising_type = AdvertisingType(int(sys.argv[3]))
else:
advertising_type = AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE
if advertising_type.is_directed:
if len(sys.argv) < 5:
print('<address> required for directed advertising')
return
target = Address(sys.argv[4])
else:
target = None
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
print('<<< connected')
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
await device.power_on()
await device.start_advertising()
await device.start_advertising(advertising_type=advertising_type, target=target)
await hci_source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -27,8 +27,8 @@ def basic_check(x):
parsed_str = str(parsed)
print(x_str)
parsed_bytes = parsed.to_bytes()
assert(x_str == parsed_str)
assert(packet == parsed_bytes)
assert x_str == parsed_str
assert packet == parsed_bytes
# -----------------------------------------------------------------------------
@@ -133,7 +133,7 @@ def test_HCI_Command_Complete_Event():
)
basic_check(event)
event = HCI_Packet.from_bytes(event.to_bytes())
assert(event.return_parameters == 7)
assert event.return_parameters == 7
# With a simple status as an integer status
event = HCI_Command_Complete_Event(
@@ -142,7 +142,7 @@ def test_HCI_Command_Complete_Event():
return_parameters = 9
)
basic_check(event)
assert(event.return_parameters == 9)
assert event.return_parameters == 9
# -----------------------------------------------------------------------------
@@ -383,20 +383,20 @@ def test_HCI_LE_Set_Extended_Scan_Parameters_Command():
# -----------------------------------------------------------------------------
def test_address():
a = Address('C4:F2:17:1A:1D:BB')
assert(not a.is_public)
assert(a.is_random)
assert(a.address_type == Address.RANDOM_DEVICE_ADDRESS)
assert(not a.is_resolvable)
assert(not a.is_resolved)
assert(a.is_static)
assert not a.is_public
assert a.is_random
assert a.address_type == Address.RANDOM_DEVICE_ADDRESS
assert not a.is_resolvable
assert not a.is_resolved
assert a.is_static
# -----------------------------------------------------------------------------
def test_custom():
data = bytes([0x77, 0x02, 0x01, 0x03])
packet = HCI_CustomPacket(data)
assert(packet.hci_packet_type == 0x77)
assert(packet.payload == data)
assert packet.hci_packet_type == 0x77
assert packet.payload == data
# -----------------------------------------------------------------------------

View File

@@ -62,6 +62,57 @@ def test_import():
assert utils
# -----------------------------------------------------------------------------
def test_app_imports():
from bumble.apps.console import main
assert main
from bumble.apps.controller_info import main
assert main
from bumble.apps.controllers import main
assert main
from bumble.apps.gatt_dump import main
assert main
from bumble.apps.gg_bridge import main
assert main
from bumble.apps.hci_bridge import main
assert main
from bumble.apps.pair import main
assert main
from bumble.apps.scan import main
assert main
from bumble.apps.show import main
assert main
from bumble.apps.unbond import main
assert main
from bumble.apps.usb_probe import main
assert main
# -----------------------------------------------------------------------------
def test_profiles_imports():
from bumble.profiles import (
battery_service,
device_information_service,
heart_rate_service
)
assert battery_service
assert device_information_service
assert heart_rate_service
# -----------------------------------------------------------------------------
if __name__ == '__main__':
test_import()
test_app_imports()
test_profiles_imports()