address PR comments

This commit is contained in:
Gilles Boccon-Gibod
2023-11-07 15:30:06 -08:00
parent 8a536cd522
commit 46239b321b
3 changed files with 89 additions and 16 deletions

View File

@@ -50,8 +50,10 @@ from bumble.sdp import (
SDP_PUBLIC_BROWSE_ROOT,
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
DataElement,
ServiceAttribute,
Client as SdpClient,
)
from bumble.transport import open_transport_or_link
import bumble.rfcomm
@@ -195,6 +197,50 @@ def make_sdp_records(channel):
}
async def find_rfcomm_channel_with_uuid(
device: Device, connection: Connection, uuid: str
) -> int:
# Connect to the SDP Server
sdp_client = SdpClient(device)
await sdp_client.connect(connection)
# Search for services with an L2CAP service attribute
search_result = await sdp_client.search_attributes(
[BT_L2CAP_PROTOCOL_ID],
[
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
],
)
for attribute_list in search_result:
service_uuid = None
service_class_id_list = ServiceAttribute.find_attribute_in_list(
attribute_list, SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID
)
if service_class_id_list:
if service_class_id_list.value:
for service_class_id in service_class_id_list.value:
service_uuid = service_class_id.value
if str(service_uuid) != uuid:
# This service doesn't have a UUID or isn't the right one.
continue
# Look for the RFCOMM Channel number
protocol_descriptor_list = ServiceAttribute.find_attribute_in_list(
attribute_list, SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID
)
if protocol_descriptor_list:
for protocol_descriptor in protocol_descriptor_list.value:
if len(protocol_descriptor.value) >= 2:
if protocol_descriptor.value[0].value == BT_RFCOMM_PROTOCOL_ID:
await sdp_client.disconnect()
return protocol_descriptor.value[1].value
await sdp_client.disconnect()
return 0
class PacketType(enum.IntEnum):
RESET = 0
SEQUENCE = 1
@@ -747,24 +793,40 @@ class L2capServer(StreamedPacketIO):
# RfcommClient
# -----------------------------------------------------------------------------
class RfcommClient(StreamedPacketIO):
def __init__(self, device, channel):
def __init__(self, device, channel, uuid):
super().__init__()
self.device = device
self.channel = channel
self.uuid = uuid
self.ready = asyncio.Event()
async def on_connection(self, connection):
connection.on('disconnection', self.on_disconnection)
# Find the channel number if not specified
channel = self.channel
if channel == 0:
print(
color(f'@@@ Discovering channel number from UUID {self.uuid}', 'cyan')
)
channel = await find_rfcomm_channel_with_uuid(
self.device, connection, self.uuid
)
print(color(f'@@@ Channel number = {channel}', 'cyan'))
if channel == 0:
print(color('!!! No RFComm service with this UUID found', 'red'))
await connection.disconnect()
return
# Create a client and start it
print(color('*** Starting RFCOMM client...', 'blue'))
rfcomm_client = bumble.rfcomm.Client(self.device, connection)
rfcomm_mux = await rfcomm_client.start()
print(color('*** Started', 'blue'))
print(color(f'### Opening session for channel {self.channel}...', 'yellow'))
print(color(f'### Opening session for channel {channel}...', 'yellow'))
try:
rfcomm_session = await rfcomm_mux.open_dlc(self.channel)
rfcomm_session = await rfcomm_mux.open_dlc(channel)
print(color('### Session open', 'yellow'), rfcomm_session)
except bumble.core.ConnectionError as error:
print(color(f'!!! Session open failed: {error}', 'red'))
@@ -1087,7 +1149,9 @@ def create_mode_factory(ctx, default_mode):
return L2capServer(device, psm=ctx.obj['l2cap_psm'])
if mode == 'rfcomm-client':
return RfcommClient(device, channel=ctx.obj['rfcomm_channel'])
return RfcommClient(
device, channel=ctx.obj['rfcomm_channel'], uuid=ctx.obj['rfcomm_uuid']
)
if mode == 'rfcomm-server':
return RfcommServer(device, channel=ctx.obj['rfcomm_channel'])
@@ -1166,6 +1230,11 @@ def create_role_factory(ctx, default_role):
default=DEFAULT_RFCOMM_CHANNEL,
help='RFComm channel to use',
)
@click.option(
'--rfcomm-uuid',
default=DEFAULT_RFCOMM_UUID,
help='RFComm service UUID to use (ignored is --rfcomm-channel is not 0)',
)
@click.option(
'--l2cap-psm',
type=int,
@@ -1208,6 +1277,7 @@ def bench(
packet_count,
start_delay,
rfcomm_channel,
rfcomm_uuid,
l2cap_psm,
):
ctx.ensure_object(dict)
@@ -1216,6 +1286,7 @@ def bench(
ctx.obj['mode'] = mode
ctx.obj['att_mtu'] = att_mtu
ctx.obj['rfcomm_channel'] = rfcomm_channel
ctx.obj['rfcomm_uuid'] = rfcomm_uuid
ctx.obj['l2cap_psm'] = l2cap_psm
ctx.obj['packet_size'] = packet_size
ctx.obj['packet_count'] = packet_count