From de7e74652d5821cddf0b7409db1015cf2c1224ad Mon Sep 17 00:00:00 2001 From: Gilles Boccon-Gibod Date: Sat, 6 Aug 2022 14:08:24 -0700 Subject: [PATCH] more HCI commands --- apps/console.py | 14 +- apps/scan.py | 14 +- bumble/device.py | 169 +++++++-- bumble/hci.py | 454 +++++++++++++++++++----- bumble/host.py | 12 +- examples/run_controller_with_scanner.py | 8 +- examples/run_scanner.py | 12 +- web/scanner.py | 6 +- 8 files changed, 540 insertions(+), 149 deletions(-) diff --git a/apps/console.py b/apps/console.py index 7ddbae40..48a94819 100644 --- a/apps/console.py +++ b/apps/console.py @@ -570,16 +570,16 @@ class DeviceListener(Device.Listener, Connection.Listener): def on_connection_data_length_change(self): self.app.append_to_output(f'connection data length change: {self.app.connected_peer.connection.data_length}') - def on_advertisement(self, address, ad_data, rssi, connectable): - entry_key = f'{address}/{address.address_type}' + def on_advertisement(self, advertisement): + entry_key = f'{advertisement.address}/{advertisement.address.address_type}' entry = self.scan_results.get(entry_key) if entry: - entry.ad_data = ad_data - entry.rssi = rssi - entry.connectable = connectable + entry.ad_data = advertisement.data + entry.rssi = advertisement.rssi + entry.connectable = advertisement.connectable else: - self.app.add_known_address(str(address)) - self.scan_results[entry_key] = ScanResult(address, address.address_type, ad_data, rssi, connectable) + self.app.add_known_address(str(advertisement.address)) + self.scan_results[entry_key] = ScanResult(advertisement.address, advertisement.address.address_type, advertisement.data, advertisement.rssi, advertisement.is_connectable) self.app.show_scan_results(self.scan_results) diff --git a/apps/scan.py b/apps/scan.py index 045cb57d..b7e2715c 100644 --- a/apps/scan.py +++ b/apps/scan.py @@ -78,14 +78,14 @@ class AdvertisementPrinter: separator = '\n ' print(f'>>> {color(address, address_color)} [{color(address_type_string, type_color)}]{address_qualifier}{resolution_qualifier}:{separator}RSSI:{rssi:4} {rssi_bar}{separator}{ad_data.to_string(separator)}\n') - def on_advertisement(self, address, ad_data, rssi, connectable): - address_color = 'yellow' if connectable else 'red' - self.print_advertisement(address, address_color, ad_data, rssi) + def on_advertisement(self, advertisement): + address_color = 'yellow' if advertisement.is_connectable else 'red' + self.print_advertisement(advertisement.address, address_color, advertisement.data, advertisement.rssi) - def on_advertising_report(self, address, ad_data, rssi, event_type): - print(f'{color("EVENT", "green")}: {HCI_LE_Advertising_Report_Event.event_type_name(event_type)}') - ad_data = AdvertisingData.from_bytes(ad_data) - self.print_advertisement(address, 'yellow', ad_data, rssi) + def on_advertising_report(self, report): + print(f'{color("EVENT", "green")}: {HCI_LE_Advertising_Report_Event.event_type_name(report.event_type)}') + data = AdvertisingData.from_bytes(report.data) + self.print_advertisement(report.address, 'yellow', data, report.rssi) # ----------------------------------------------------------------------------- diff --git a/bumble/device.py b/bumble/device.py index dbc75fee..2e890b76 100644 --- a/bumble/device.py +++ b/bumble/device.py @@ -18,7 +18,7 @@ import json import asyncio import logging -from contextlib import asynccontextmanager, AsyncExitStack +from contextlib import asynccontextmanager, AsyncExitStack from .hci import * from .host import Host @@ -61,29 +61,134 @@ DEVICE_MAX_SCAN_WINDOW = 10240 # ----------------------------------------------------------------------------- +# ----------------------------------------------------------------------------- +class Advertisement: + TX_POWER_NOT_AVAILABLE = HCI_LE_Extended_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE + RSSI_NOT_AVAILABLE = HCI_LE_Extended_Advertising_Report_Event.RSSI_NOT_AVAILABLE + + @classmethod + def from_advertising_report(cls, report): + if isinstance(report, HCI_LE_Advertising_Report_Event.Report): + return LegacyAdvertisement.from_advertising_report(report) + elif isinstance(report, HCI_LE_Extended_Advertising_Report_Event.Report): + return ExtendedAdvertisement.from_advertising_report(report) + + def __init__( + self, + address, + rssi = HCI_LE_Extended_Advertising_Report_Event.RSSI_NOT_AVAILABLE, + is_legacy = False, + is_anonymous = False, + is_connectable = False, + is_directed = False, + is_scannable = False, + is_scan_response = False, + primary_phy = 0, + secondary_phy = 0, + tx_power = HCI_LE_Extended_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE, + sid = 0, + data = b'' + ): + self.address = address + self.rssi = rssi + self.is_legacy = is_legacy + self.is_anonymous = is_anonymous + self.is_connectable = is_connectable + self.is_directed = is_directed + self.is_scannable = is_scannable + self.is_scan_response = is_scan_response + self.primary_phy = primary_phy + self.secondary_phy = secondary_phy + self.tx_power = tx_power + self.sid = sid + self.data = AdvertisingData.from_bytes(data) + + +# ----------------------------------------------------------------------------- +class LegacyAdvertisement(Advertisement): + @classmethod + def from_advertising_report(cls, report): + return cls( + address = report.address, + rssi = report.rssi, + is_legacy = True, + is_connectable = report.event_type in { + HCI_LE_Advertising_Report_Event.ADV_IND, + HCI_LE_Advertising_Report_Event.ADV_DIRECT_IND + }, + is_directed = report.event_type == HCI_LE_Advertising_Report_Event.ADV_DIRECT_IND, + is_scannable = report.event_type in { + HCI_LE_Advertising_Report_Event.ADV_IND, + HCI_LE_Advertising_Report_Event.ADV_SCAN_IND + }, + is_scan_response = report.event_type == HCI_LE_Advertising_Report_Event.SCAN_RSP, + data = report.data + ) + + +# ----------------------------------------------------------------------------- +class ExtendedAdvertisement(Advertisement): + @classmethod + def from_advertising_report(cls, report): + return cls( + address = report.address, + rssi = report.rssi, + is_legacy = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.LEGACY_ADVERTISING_PDU_USED) != 0, + is_anonymous = report.address.address_type == HCI_LE_Extended_Advertising_Report_Event.ANONYMOUS_ADDRESS_TYPE, + is_connectable = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.CONNECTABLE_ADVERTISING) != 0, + is_directed = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.DIRECTED_ADVERTISING) != 0, + is_scannable = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.SCANNABLE_ADVERTISING) != 0, + is_scan_response = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.SCAN_RESPONSE) != 0, + primary_phy = report.primary_phy, + secondary_phy = report.secondary_phy, + tx_power = report.tx_power, + sid = report.advertising_sid, + data = report.data + ) + + # ----------------------------------------------------------------------------- class AdvertisementDataAccumulator: - def __init__(self): - self.advertising_data = AdvertisingData() - self.last_advertisement_type = None - self.connectable = False - self.flushable = False + def __init__(self, passive=False): + self.passive = passive + self.last_event_type = None + self.advertisement = None + self.data = b'' - def update(self, data, advertisement_type): - if advertisement_type == HCI_LE_Advertising_Report_Event.SCAN_RSP: - if self.last_advertisement_type != HCI_LE_Advertising_Report_Event.SCAN_RSP: - self.advertising_data.append(data) - self.flushable = True - else: - self.advertising_data = AdvertisingData.from_bytes(data) - self.flushable = self.last_advertisement_type != HCI_LE_Advertising_Report_Event.SCAN_RSP + def update(self, report): + if isinstance(report, HCI_LE_Advertising_Report_Event.Report): + if report.event_type == HCI_LE_Advertising_Report_Event.SCAN_RSP: + if self.last_event_type in { + HCI_LE_Advertising_Report_Event.ADV_IND, + HCI_LE_Advertising_Report_Event.ADV_SCAN_IND + }: + # This is the response to a scannable advertisement + self.advertisement = Advertisement.from_advertising_report(report) + self.advertisement.data = AdvertisingData.from_bytes(self.data + report.data) + self.advertisement.is_connectable = (self.last_event_type == HCI_LE_Advertising_Report_Event.ADV_IND) + else: + # Unexpected scan response + self.advertisement = None - if advertisement_type == HCI_LE_Advertising_Report_Event.ADV_IND or advertisement_type == HCI_LE_Advertising_Report_Event.ADV_DIRECT_IND: - self.connectable = True - elif advertisement_type == HCI_LE_Advertising_Report_Event.ADV_SCAN_IND or advertisement_type == HCI_LE_Advertising_Report_Event.ADV_NONCONN_IND: - self.connectable = False + # Reset the data + self.data = b'' + else: + self.data = report.data - self.last_advertisement_type = advertisement_type + if self.passive or report.event_type in { + HCI_LE_Advertising_Report_Event.ADV_DIRECT_IND, + HCI_LE_Advertising_Report_Event.ADV_NONCONN_IND + } or self.last_event_type not in { + None, + HCI_LE_Advertising_Report_Event.SCAN_RSP + }: + # Don't wait for a scan response + self.advertisement = Advertisement.from_advertising_report(report) + else: + # Wait for a scan response + self.advertisement = None + + self.last_event_type = report.event_type # ----------------------------------------------------------------------------- @@ -403,7 +508,7 @@ class Device(CompositeEventEmitter): @composite_listener class Listener: - def on_advertisement(self, address, data, rssi, advertisement_type): + def on_advertisement(self, advertisement): pass def on_inquiry_result(self, address, class_of_device, data, rssi): @@ -454,6 +559,7 @@ class Device(CompositeEventEmitter): [l2cap.L2CAP_Information_Request.EXTENDED_FEATURE_FIXED_CHANNELS]) self.advertisement_data = {} self.scanning = False + self.scanning_is_passive = False self.discovering = False self.connecting = False self.disconnecting = False @@ -757,7 +863,8 @@ class Device(CompositeEventEmitter): filter_duplicates = 1 if filter_duplicates else 0 )) - self.scanning = True + self.scanning = True + self.scanning_is_passive = not active async def stop_scanning(self): await self.send_command(HCI_LE_Set_Scan_Enable_Command( @@ -771,19 +878,13 @@ class Device(CompositeEventEmitter): return self.scanning @host_event_handler - def on_advertising_report(self, address, data, rssi, advertisement_type): - if not (accumulator := self.advertisement_data.get(address)): - accumulator = AdvertisementDataAccumulator() - self.advertisement_data[address] = accumulator - accumulator.update(data, advertisement_type) - if accumulator.flushable: - self.emit( - 'advertisement', - address, - accumulator.advertising_data, - rssi, - accumulator.connectable - ) + def on_advertising_report(self, report): + if not (accumulator := self.advertisement_data.get(report.address)): + accumulator = AdvertisementDataAccumulator(passive=self.scanning_is_passive) + self.advertisement_data[report.address] = accumulator + accumulator.update(report) + if accumulator.advertisement is not None: + self.emit('advertisement', accumulator.advertisement) async def start_discovery(self): await self.host.send_command(HCI_Write_Inquiry_Mode_Command(inquiry_mode=HCI_EXTENDED_INQUIRY_MODE)) diff --git a/bumble/hci.py b/bumble/hci.py index 1400baac..17205749 100644 --- a/bumble/hci.py +++ b/bumble/hci.py @@ -1373,11 +1373,14 @@ class HCI_Error(ProtocolError): super().__init__(error_code, 'hci', HCI_Constant.error_name(error_code)) +# ----------------------------------------------------------------------------- class HCI_StatusError(ProtocolError): def __init__(self, response): - super().__init__(response.status, - error_namespace=HCI_Command.command_name(response.command_opcode), - error_name=HCI_Constant.status_name(response.status)) + super().__init__( + response.status, + error_namespace=HCI_Command.command_name(response.command_opcode), + error_name=HCI_Constant.status_name(response.status) + ) # ----------------------------------------------------------------------------- @@ -1402,7 +1405,7 @@ class HCI_Object: def dict_from_bytes(data, offset, fields): result = collections.OrderedDict() for (field_name, field_type) in fields: - # The field_type may be a dictionnary with a mapper, parser, and/or size + # The field_type may be a dictionary with a mapper, parser, and/or size if type(field_type) is dict: if 'size' in field_type: field_type = field_type['size'] @@ -1523,9 +1526,9 @@ class HCI_Object: return bytes(result) - @staticmethod - def from_bytes(data, offset, fields): - return HCI_Object(fields, **HCI_Object.dict_from_bytes(data, offset, fields)) + @classmethod + def from_bytes(cls, data, offset, fields): + return cls(fields, **cls.dict_from_bytes(data, offset, fields)) def to_bytes(self): return HCI_Object.dict_to_bytes(self.__dict__, self.fields) @@ -3024,6 +3027,24 @@ class HCI_LE_Remote_Connection_Parameter_Request_Negative_Reply_Command(HCI_Comm ''' +# ----------------------------------------------------------------------------- +@HCI_Command.command( + fields=[ + ('connection_handle', 2), + ('tx_octets', 2), + ('tx_time', 2), + ], + return_parameters_fields=[ + ('status', STATUS_SPEC), + ('connection_handle', 2) + ] +) +class HCI_LE_Set_Data_Length_Command(HCI_Command): + ''' + See Bluetooth spec @ 7.8.33 LE Set Data Length Command + ''' + + # ----------------------------------------------------------------------------- @HCI_Command.command([ ('suggested_max_tx_octets', 2), @@ -3102,6 +3123,228 @@ class HCI_LE_Read_Maximum_Data_Length_Command(HCI_Command): ''' +# ----------------------------------------------------------------------------- +@HCI_Command.command( + fields=[ + ('connection_handle', 2) + ], + return_parameters_fields=[ + ('status', STATUS_SPEC), + ('connection_handle', 2), + ('tx_phy', {'size': 1, 'mapper': HCI_Constant.le_phy_name}), + ('rx_phy', {'size': 1, 'mapper': HCI_Constant.le_phy_name}) + ]) +class HCI_LE_Read_PHY_Command(HCI_Command): + ''' + See Bluetooth spec @ 7.8.47 LE Read PHY Command + ''' + + +# ----------------------------------------------------------------------------- +@HCI_Command.command([ + ('connection_handle', 2), + ('all_phys', 1), + ('tx_phys', 1), + ('rx_phys', 1), + ('phy_options', 2) +]) +class HCI_LE_Set_PHY_Command(HCI_Command): + ''' + See Bluetooth spec @ 7.8.49 LE Set PHY Command + ''' + + +# ----------------------------------------------------------------------------- +@HCI_Command.command([ + ('advertising_handle', 1), + ('random_address', lambda data, offset: Address.parse_address_with_type(data, offset, Address.RANDOM_DEVICE_ADDRESS)) +]) +class HCI_LE_Set_Advertising_Set_Random_Address_Command(HCI_Command): + ''' + See Bluetooth spec @ 7.8.52 LE Set Advertising Set Random Address Command + ''' + + +# ----------------------------------------------------------------------------- +@HCI_Command.command( + fields=[ + ('advertising_handle', 1), + ('advertising_event_properties', {'size': 2, 'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Parameters_Command.advertising_properties_string(x)}), + ('primary_advertising_interval_min', 3), + ('primary_advertising_interval_max', 3), + ('primary_advertising_channel_map', {'size': 1, 'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Parameters_Command.channel_map_string(x)}), + ('own_address_type', Address.ADDRESS_TYPE_SPEC), + ('peer_address_type', Address.ADDRESS_TYPE_SPEC), + ('peer_address', Address.parse_address_preceded_by_type), + ('advertising_filter_policy', 1), + ('advertising_tx_power', 1), + ('primary_advertising_phy', {'size': 1, 'mapper': HCI_Constant.le_phy_name}), + ('secondary_advertising_max_skip', 1), + ('secondary_advertising_phy', {'size': 1, 'mapper': HCI_Constant.le_phy_name}), + ('advertising_sid', 1), + ('scan_request_notification_enable', 1) + ], + return_parameters_fields=[ + ('status', STATUS_SPEC), + ('selected_tx__power', 1) + ] +) +class HCI_LE_Set_Extended_Advertising_Parameters_Command(HCI_Command): + ''' + See Bluetooth spec @ 7.8.53 LE Set Extended Advertising Parameters Command + ''' + + CONNECTABLE_ADVERTISING = 0 + SCANNABLE_ADVERTISING = 1 + DIRECTED_ADVERTISING = 2 + HIGH_DUTY_CYCLE_DIRECTED_CONNECTABLE_ADVERTISING = 3 + USE_LEGACY_ADVERTISING_PDUS = 4 + ANONYMOUS_ADVERTISING = 5 + INCLUDE_TX_POWER = 6 + + ADVERTISING_PROPERTIES_NAMES = ( + 'CONNECTABLE_ADVERTISING', + 'SCANNABLE_ADVERTISING', + 'DIRECTED_ADVERTISING', + 'HIGH_DUTY_CYCLE_DIRECTED_CONNECTABLE_ADVERTISING', + 'USE_LEGACY_ADVERTISING_PDUS', + 'ANONYMOUS_ADVERTISING', + 'INCLUDE_TX_POWER' + ) + + CHANNEL_37 = 0 + CHANNEL_38 = 1 + CHANNEL_39 = 2 + + CHANNEL_NAMES = ('37', '38', '39') + + @classmethod + def advertising_properties_string(cls, properties): + return f'[{",".join(bit_flags_to_strings(properties, cls.ADVERTISING_PROPERTIES_NAMES))}]' + + @classmethod + def channel_map_string(cls, channel_map): + return f'[{",".join(bit_flags_to_strings(channel_map, cls.CHANNEL_NAMES))}]' + + +# ----------------------------------------------------------------------------- +@HCI_Command.command([ + ('advertising_handle', 1), + ('operation', {'size': 1, 'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.operation_name(x)}), + ('fragment_preference', 1), + ('advertising_data', { + 'parser': HCI_Object.parse_length_prefixed_bytes, + 'serializer': functools.partial(HCI_Object.serialize_length_prefixed_bytes) + }) +]) +class HCI_LE_Set_Extended_Advertising_Data_Command(HCI_Command): + ''' + See Bluetooth spec @ 7.8.54 LE Set Extended Advertising Data Command + ''' + + INTERMEDIATE_FRAGMENT = 0x00 + FIRST_FRAGMENT = 0x01 + LAST_FRAGMENT = 0x02 + COMPLETE_DATA = 0x03 + UNCHANGED_DATA = 0x04 + + OPERATION_NAMES = { + INTERMEDIATE_FRAGMENT: 'INTERMEDIATE_FRAGMENT', + FIRST_FRAGMENT: 'FIRST_FRAGMENT', + LAST_FRAGMENT: 'LAST_FRAGMENT', + COMPLETE_DATA: 'COMPLETE_DATA', + UNCHANGED_DATA: 'UNCHANGED_DATA' + } + + @classmethod + def operation_name(cls, operation): + return name_or_number(cls.OPERATION_NAMES, operation) + + +# ----------------------------------------------------------------------------- +@HCI_Command.command([ + ('advertising_handle', 1), + ('operation', {'size': 1, 'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.operation_name(x)}), + ('fragment_preference', 1), + ('scan_response_data', { + 'parser': HCI_Object.parse_length_prefixed_bytes, + 'serializer': functools.partial(HCI_Object.serialize_length_prefixed_bytes) + }) +]) +class HCI_LE_Set_Extended_Scan_Response_Data_Command(HCI_Command): + ''' + See Bluetooth spec @ 7.8.55 LE Set Extended Scan Response Data Command + ''' + + INTERMEDIATE_FRAGMENT = 0x00 + FIRST_FRAGMENT = 0x01 + LAST_FRAGMENT = 0x02 + COMPLETE_DATA = 0x03 + + OPERATION_NAMES = { + INTERMEDIATE_FRAGMENT: 'INTERMEDIATE_FRAGMENT', + FIRST_FRAGMENT: 'FIRST_FRAGMENT', + LAST_FRAGMENT: 'LAST_FRAGMENT', + COMPLETE_DATA: 'COMPLETE_DATA' + } + + @classmethod + def operation_name(cls, operation): + return name_or_number(cls.OPERATION_NAMES, operation) + + +# ----------------------------------------------------------------------------- +@HCI_Command.command(fields=None) +class HCI_LE_Set_Extended_Advertising_Enable_Command(HCI_Command): + ''' + See Bluetooth spec @ 7.8.56 LE Set Extended Advertising Enable Command + ''' + + @classmethod + def from_parameters(cls, parameters): + enable = parameters[0] + num_sets = parameters[1] + advertising_handles = [] + durations = [] + max_extended_advertising_events = [] + offset = 2 + for _ in range(num_sets): + advertising_handles.append(parameters[offset]) + durations.append(struct.unpack_from('> 5) & 3]) + + if event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.LEGACY_ADVERTISING_PDU_USED): + legacy_pdu_type = HCI_LE_Extended_Advertising_Report_Event.LEGACY_PDU_TYPE_MAP.get(event_type & 0x0F) + if legacy_pdu_type is not None: + legacy_info_string = f'({HCI_LE_Advertising_Report_Event.event_type_name(legacy_pdu_type)})' + else: + legacy_info_string = '' + else: + legacy_info_string = '' + + return f'0x{event_type:04X} [{",".join(event_type_flags)}]{legacy_info_string}' + + return super().to_string(prefix, { + 'event_type': event_type_string, + 'address_type': Address.address_type_name, + 'data': lambda x: str(AdvertisingData.from_bytes(x)) + }) @classmethod def from_parameters(cls, parameters): @@ -3643,7 +3958,7 @@ class HCI_LE_Extended_Advertising_Report_Event(HCI_LE_Meta_Event): reports = [] offset = 2 for _ in range(num_reports): - report = HCI_Object.from_bytes(parameters, offset, cls.REPORT_FIELDS) + report = cls.Report.from_parameters(parameters, offset) offset += 24 + len(report.data) reports.append(report) @@ -3658,30 +3973,7 @@ class HCI_LE_Extended_Advertising_Report_Event(HCI_LE_Meta_Event): super().__init__(self.subevent_code, parameters) def __str__(self): - def event_type_string(event_type): - event_type_flags = [] - for bit in range(0, 5): - if event_type & (1 << bit): - event_type_flags.append(self.EVENT_TYPE_NAMES[bit]) - event_type_flags.append(('COMPLETE', 'INCOMPLETE+', 'INCOMPLETE#', '?')[(event_type >> 5) & 3]) - - if event_type & (1 << self.LEGACY_ADVERTISING_PDU_USED): - legacy_pdu_type = self.LEGACY_PDU_TYPE_MAP.get(event_type & 0x0F) - if legacy_pdu_type is not None: - legacy_info_string = f'({HCI_LE_Advertising_Report_Event.event_type_name(legacy_pdu_type)})' - else: - legacy_info_string = '' - else: - legacy_info_string = '' - - return f'0x{event_type:04X} [{",".join(event_type_flags)}]{legacy_info_string}' - - reports = '\n'.join([report.to_string(' ', { - 'event_type': event_type_string, - 'address_type': Address.address_type_name, - 'data': lambda x: str(AdvertisingData.from_bytes(x)) - }) for report in self.reports]) - + reports = '\n'.join([report.to_string(' ') for report in self.reports]) return f'{color(self.subevent_name(self.subevent_code), "magenta")}:\n{reports}' diff --git a/bumble/host.py b/bumble/host.py index cc692d0b..67ad759c 100644 --- a/bumble/host.py +++ b/bumble/host.py @@ -467,13 +467,11 @@ class Host(EventEmitter): def on_hci_le_advertising_report_event(self, event): for report in event.reports: - self.emit( - 'advertising_report', - report.address, - report.data, - report.rssi, - report.event_type - ) + self.emit('advertising_report', report) + + def on_hci_le_extended_advertising_report_event(self, event): + # TODO + pass def on_hci_le_remote_connection_parameter_request_event(self, event): if event.connection_handle not in self.connections: diff --git a/examples/run_controller_with_scanner.py b/examples/run_controller_with_scanner.py index 88bc1f8c..18ba2743 100644 --- a/examples/run_controller_with_scanner.py +++ b/examples/run_controller_with_scanner.py @@ -29,15 +29,15 @@ from bumble.transport import open_transport_or_link # ----------------------------------------------------------------------------- class ScannerListener(Device.Listener): - def on_advertisement(self, address, ad_data, rssi, connectable): - address_type_string = ('P', 'R', 'PI', 'RI')[address.address_type] - address_color = 'yellow' if connectable else 'red' + def on_advertisement(self, advertisement): + address_type_string = ('P', 'R', 'PI', 'RI')[advertisement.address.address_type] + address_color = 'yellow' if advertisement.is_connectable else 'red' if address_type_string.startswith('P'): type_color = 'green' else: type_color = 'cyan' - print(f'>>> {color(address, address_color)} [{color(address_type_string, type_color)}]: RSSI={rssi}, {ad_data}') + print(f'>>> {color(advertisement.address, address_color)} [{color(address_type_string, type_color)}]: RSSI={advertisement.rssi}, {advertisement.data}') # ----------------------------------------------------------------------------- diff --git a/examples/run_scanner.py b/examples/run_scanner.py index feed88f4..719e58ed 100644 --- a/examples/run_scanner.py +++ b/examples/run_scanner.py @@ -40,24 +40,24 @@ async def main(): device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink) @device.on('advertisement') - def _(address, ad_data, rssi, connectable): - address_type_string = ('PUBLIC', 'RANDOM', 'PUBLIC_ID', 'RANDOM_ID')[address.address_type] - address_color = 'yellow' if connectable else 'red' + def _(advertisement): + address_type_string = ('PUBLIC', 'RANDOM', 'PUBLIC_ID', 'RANDOM_ID')[advertisement.address.address_type] + address_color = 'yellow' if advertisement.is_connectable else 'red' address_qualifier = '' if address_type_string.startswith('P'): type_color = 'cyan' else: - if address.is_static: + if advertisement.address.is_static: type_color = 'green' address_qualifier = '(static)' - elif address.is_resolvable: + elif advertisement.address.is_resolvable: type_color = 'magenta' address_qualifier = '(resolvable)' else: type_color = 'white' separator = '\n ' - print(f'>>> {color(address, address_color)} [{color(address_type_string, type_color)}]{address_qualifier}:{separator}RSSI:{rssi}{separator}{ad_data.to_string(separator)}') + print(f'>>> {color(advertisement.address, address_color)} [{color(address_type_string, type_color)}]{address_qualifier}:{separator}RSSI:{advertisement.rssi}{separator}{advertisement.data.to_string(separator)}') await device.power_on() await device.start_scanning(filter_duplicates=filter_duplicates) diff --git a/web/scanner.py b/web/scanner.py index 9ab9f471..e734dbf2 100644 --- a/web/scanner.py +++ b/web/scanner.py @@ -21,9 +21,9 @@ from bumble.transport import PacketParser # ----------------------------------------------------------------------------- class ScannerListener(Device.Listener): - def on_advertisement(self, address, ad_data, rssi, connectable): - address_type_string = ('P', 'R', 'PI', 'RI')[address.address_type] - print(f'>>> {address} [{address_type_string}]: RSSI={rssi}, {ad_data}') + def on_advertisement(self, advertisement): + address_type_string = ('P', 'R', 'PI', 'RI')[advertisement.address.address_type] + print(f'>>> {advertisement.address} [{address_type_string}]: RSSI={advertisement.rssi}, {advertisement.ad_data}') class HciSource: