remove unneeded constructor parameters

This commit is contained in:
Gilles Boccon-Gibod
2023-11-07 21:02:27 -08:00
parent 46239b321b
commit 268f6b0d51
12 changed files with 41 additions and 55 deletions

View File

@@ -197,12 +197,10 @@ def make_sdp_records(channel):
}
async def find_rfcomm_channel_with_uuid(
device: Device, connection: Connection, uuid: str
) -> int:
async def find_rfcomm_channel_with_uuid(connection: Connection, uuid: str) -> int:
# Connect to the SDP Server
sdp_client = SdpClient(device)
await sdp_client.connect(connection)
sdp_client = SdpClient(connection)
await sdp_client.connect()
# Search for services with an L2CAP service attribute
search_result = await sdp_client.search_attributes(
@@ -809,9 +807,7 @@ class RfcommClient(StreamedPacketIO):
print(
color(f'@@@ Discovering channel number from UUID {self.uuid}', 'cyan')
)
channel = await find_rfcomm_channel_with_uuid(
self.device, connection, self.uuid
)
channel = await find_rfcomm_channel_with_uuid(connection, self.uuid)
print(color(f'@@@ Channel number = {channel}', 'cyan'))
if channel == 0:
print(color('!!! No RFComm service with this UUID found', 'red'))
@@ -820,7 +816,7 @@ class RfcommClient(StreamedPacketIO):
# Create a client and start it
print(color('*** Starting RFCOMM client...', 'blue'))
rfcomm_client = bumble.rfcomm.Client(self.device, connection)
rfcomm_client = bumble.rfcomm.Client(connection)
rfcomm_mux = await rfcomm_client.start()
print(color('*** Started', 'blue'))

View File

@@ -250,15 +250,15 @@ async def find_avdtp_service_with_sdp_client(
# -----------------------------------------------------------------------------
async def find_avdtp_service_with_connection(
device: device.Device, connection: device.Connection
connection: device.Connection,
) -> Optional[Tuple[int, int]]:
'''
Find an AVDTP service, for a connection, and return its version,
or None if none is found
'''
sdp_client = sdp.Client(device)
await sdp_client.connect(connection)
sdp_client = sdp.Client(connection)
await sdp_client.connect()
service_version = await find_avdtp_service_with_sdp_client(sdp_client)
await sdp_client.disconnect()

View File

@@ -889,8 +889,7 @@ class Client:
multiplexer: Optional[Multiplexer]
l2cap_channel: Optional[l2cap.ClassicChannel]
def __init__(self, device: Device, connection: Connection) -> None:
self.device = device
def __init__(self, connection: Connection) -> None:
self.connection = connection
self.l2cap_channel = None
self.multiplexer = None
@@ -906,7 +905,7 @@ class Client:
raise
assert self.l2cap_channel is not None
# Create a mutliplexer to manage DLCs with the server
# Create a multiplexer to manage DLCs with the server
self.multiplexer = Multiplexer(self.l2cap_channel, Multiplexer.Role.INITIATOR)
# Connect the multiplexer

View File

@@ -760,13 +760,13 @@ class SDP_ServiceSearchAttributeResponse(SDP_PDU):
class Client:
channel: Optional[l2cap.ClassicChannel]
def __init__(self, device: Device) -> None:
self.device = device
def __init__(self, connection: Connection) -> None:
self.connection = connection
self.pending_request = None
self.channel = None
async def connect(self, connection: Connection) -> None:
self.channel = await connection.create_l2cap_channel(
async def connect(self) -> None:
self.channel = await self.connection.create_l2cap_channel(
spec=l2cap.ClassicChannelSpec(SDP_PSM)
)

View File

@@ -53,10 +53,10 @@ def sdp_records():
# -----------------------------------------------------------------------------
# pylint: disable-next=too-many-nested-blocks
async def find_a2dp_service(device, connection):
async def find_a2dp_service(connection):
# Connect to the SDP Server
sdp_client = SDP_Client(device)
await sdp_client.connect(connection)
sdp_client = SDP_Client(connection)
await sdp_client.connect()
# Search for services with an Audio Sink service class
search_result = await sdp_client.search_attributes(
@@ -177,7 +177,7 @@ async def main():
print('*** Encryption on')
# Look for an A2DP service
avdtp_version = await find_a2dp_service(device, connection)
avdtp_version = await find_a2dp_service(connection)
if not avdtp_version:
print(color('!!! no AVDTP service found'))
return

View File

@@ -165,9 +165,7 @@ async def main():
print('*** Encryption on')
# Look for an A2DP service
avdtp_version = await find_avdtp_service_with_connection(
device, connection
)
avdtp_version = await find_avdtp_service_with_connection(connection)
if not avdtp_version:
print(color('!!! no A2DP service found'))
return

View File

@@ -63,8 +63,8 @@ async def main():
print(f'=== Connected to {connection.peer_address}!')
# Connect to the SDP Server
sdp_client = SDP_Client(device)
await sdp_client.connect(connection)
sdp_client = SDP_Client(connection)
await sdp_client.connect()
# List all services in the root browse group
service_record_handles = await sdp_client.search_services(

View File

@@ -49,8 +49,8 @@ logger = logging.getLogger(__name__)
# pylint: disable-next=too-many-nested-blocks
async def list_rfcomm_channels(device, connection):
# Connect to the SDP Server
sdp_client = SDP_Client(device)
await sdp_client.connect(connection)
sdp_client = SDP_Client(connection)
await sdp_client.connect()
# Search for services that support the Handsfree Profile
search_result = await sdp_client.search_attributes(
@@ -184,7 +184,7 @@ async def main():
# Create a client and start it
print('@@@ Starting to RFCOMM client...')
rfcomm_client = rfcomm.Client(device, connection)
rfcomm_client = rfcomm.Client(connection)
rfcomm_mux = await rfcomm_client.start()
print('@@@ Started')

View File

@@ -22,12 +22,9 @@ import logging
from bumble.colors import color
import bumble.core
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.core import (
BT_L2CAP_PROTOCOL_ID,
BT_HIDP_PROTOCOL_ID,
BT_HUMAN_INTERFACE_DEVICE_SERVICE,
BT_BR_EDR_TRANSPORT,
)
@@ -35,8 +32,6 @@ from bumble.hci import Address
from bumble.hid import Host, Message
from bumble.sdp import (
Client as SDP_Client,
DataElement,
ServiceAttribute,
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
@@ -75,11 +70,11 @@ SDP_HID_SSR_HOST_MIN_TIMEOUT_ATTRIBUTE_ID = 0x0210
# -----------------------------------------------------------------------------
async def get_hid_device_sdp_record(device, connection):
async def get_hid_device_sdp_record(connection):
# Connect to the SDP Server
sdp_client = SDP_Client(device)
await sdp_client.connect(connection)
sdp_client = SDP_Client(connection)
await sdp_client.connect()
if sdp_client:
print(color('Connected to SDP Server', 'blue'))
else:
@@ -348,7 +343,7 @@ async def main():
await connection.encrypt()
print('*** Encryption on')
await get_hid_device_sdp_record(device, connection)
await get_hid_device_sdp_record(connection)
# Create HID host and start it
print('@@@ Starting HID Host...')

View File

@@ -42,10 +42,10 @@ from bumble.sdp import (
# -----------------------------------------------------------------------------
async def list_rfcomm_channels(device, connection):
async def list_rfcomm_channels(connection):
# Connect to the SDP Server
sdp_client = SDP_Client(device)
await sdp_client.connect(connection)
sdp_client = SDP_Client(connection)
await sdp_client.connect()
# Search for services with an L2CAP service attribute
search_result = await sdp_client.search_attributes(
@@ -194,7 +194,7 @@ async def main():
channel = sys.argv[4]
if channel == 'discover':
await list_rfcomm_channels(device, connection)
await list_rfcomm_channels(connection)
return
# Request authentication
@@ -209,7 +209,7 @@ async def main():
# Create a client and start it
print('@@@ Starting RFCOMM client...')
rfcomm_client = Client(device, connection)
rfcomm_client = Client(connection)
rfcomm_mux = await rfcomm_client.start()
print('@@@ Started')

View File

@@ -43,12 +43,10 @@ async def make_hfp_connections(
# Setup RFCOMM channel
wait_dlc = asyncio.get_running_loop().create_future()
rfcomm_channel = rfcomm.Server(devices.devices[0]).listen(
lambda dlc: wait_dlc.set_result(dlc)
)
rfcomm_channel = rfcomm.Server(devices.devices[0]).listen(wait_dlc.set_result)
assert devices.connections[0]
assert devices.connections[1]
client_mux = await rfcomm.Client(devices.devices[1], devices.connections[1]).start()
client_mux = await rfcomm.Client(devices.connections[1]).start()
client_dlc = await client_mux.open_dlc(rfcomm_channel)
server_dlc = await wait_dlc

View File

@@ -215,8 +215,8 @@ async def test_service_search():
devices.devices[0].sdp_server.service_records.update(sdp_records())
# Search for service
client = Client(devices.devices[1])
await client.connect(devices.connections[1])
client = Client(devices.connections[1])
await client.connect()
services = await client.search_services(
[UUID('E6D55659-C8B4-4B85-96BB-B1143AF6D3AE')]
)
@@ -236,8 +236,8 @@ async def test_service_attribute():
devices.devices[0].sdp_server.service_records.update(sdp_records())
# Search for service
client = Client(devices.devices[1])
await client.connect(devices.connections[1])
client = Client(devices.connections[1])
await client.connect()
attributes = await client.get_attributes(
0x00010001, [SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID]
)
@@ -257,8 +257,8 @@ async def test_service_search_attribute():
devices.devices[0].sdp_server.service_records.update(sdp_records())
# Search for service
client = Client(devices.devices[1])
await client.connect(devices.connections[1])
client = Client(devices.connections[1])
await client.connect()
attributes = await client.search_attributes(
[UUID('E6D55659-C8B4-4B85-96BB-B1143AF6D3AE')], [(0x0000FFFF, 8)]
)