Merge branch 'google:main' into bumble_hid_device

This commit is contained in:
SneKarnataki
2023-11-23 04:03:53 +00:00
committed by GitHub
66 changed files with 2258 additions and 132 deletions
+4 -4
View File
@@ -53,10 +53,10 @@ def sdp_records():
# -----------------------------------------------------------------------------
# pylint: disable-next=too-many-nested-blocks
async def find_a2dp_service(device, connection):
async def find_a2dp_service(connection):
# Connect to the SDP Server
sdp_client = SDP_Client(device)
await sdp_client.connect(connection)
sdp_client = SDP_Client(connection)
await sdp_client.connect()
# Search for services with an Audio Sink service class
search_result = await sdp_client.search_attributes(
@@ -177,7 +177,7 @@ async def main():
print('*** Encryption on')
# Look for an A2DP service
avdtp_version = await find_a2dp_service(device, connection)
avdtp_version = await find_a2dp_service(connection)
if not avdtp_version:
print(color('!!! no AVDTP service found'))
return
+1 -3
View File
@@ -165,9 +165,7 @@ async def main():
print('*** Encryption on')
# Look for an A2DP service
avdtp_version = await find_avdtp_service_with_connection(
device, connection
)
avdtp_version = await find_avdtp_service_with_connection(connection)
if not avdtp_version:
print(color('!!! no A2DP service found'))
return
+2 -2
View File
@@ -63,8 +63,8 @@ async def main():
print(f'=== Connected to {connection.peer_address}!')
# Connect to the SDP Server
sdp_client = SDP_Client(device)
await sdp_client.connect(connection)
sdp_client = SDP_Client(connection)
await sdp_client.connect()
# List all services in the root browse group
service_record_handles = await sdp_client.search_services(
+3 -3
View File
@@ -49,8 +49,8 @@ logger = logging.getLogger(__name__)
# pylint: disable-next=too-many-nested-blocks
async def list_rfcomm_channels(device, connection):
# Connect to the SDP Server
sdp_client = SDP_Client(device)
await sdp_client.connect(connection)
sdp_client = SDP_Client(connection)
await sdp_client.connect()
# Search for services that support the Handsfree Profile
search_result = await sdp_client.search_attributes(
@@ -184,7 +184,7 @@ async def main():
# Create a client and start it
print('@@@ Starting to RFCOMM client...')
rfcomm_client = rfcomm.Client(device, connection)
rfcomm_client = rfcomm.Client(connection)
rfcomm_mux = await rfcomm_client.start()
print('@@@ Started')
+4 -9
View File
@@ -22,12 +22,9 @@ import logging
from bumble.colors import color
import bumble.core
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.core import (
BT_L2CAP_PROTOCOL_ID,
BT_HIDP_PROTOCOL_ID,
BT_HUMAN_INTERFACE_DEVICE_SERVICE,
BT_BR_EDR_TRANSPORT,
)
@@ -35,8 +32,6 @@ from bumble.hci import Address
from bumble.hid import Host, Message
from bumble.sdp import (
Client as SDP_Client,
DataElement,
ServiceAttribute,
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
@@ -75,11 +70,11 @@ SDP_HID_SSR_HOST_MIN_TIMEOUT_ATTRIBUTE_ID = 0x0210
# -----------------------------------------------------------------------------
async def get_hid_device_sdp_record(device, connection):
async def get_hid_device_sdp_record(connection):
# Connect to the SDP Server
sdp_client = SDP_Client(device)
await sdp_client.connect(connection)
sdp_client = SDP_Client(connection)
await sdp_client.connect()
if sdp_client:
print(color('Connected to SDP Server', 'blue'))
else:
@@ -365,7 +360,7 @@ async def main():
await connection.encrypt()
print('*** Encryption on')
await get_hid_device_sdp_record(device, connection)
await get_hid_device_sdp_record(connection)
async def menu():
+5 -5
View File
@@ -42,10 +42,10 @@ from bumble.sdp import (
# -----------------------------------------------------------------------------
async def list_rfcomm_channels(device, connection):
async def list_rfcomm_channels(connection):
# Connect to the SDP Server
sdp_client = SDP_Client(device)
await sdp_client.connect(connection)
sdp_client = SDP_Client(connection)
await sdp_client.connect()
# Search for services with an L2CAP service attribute
search_result = await sdp_client.search_attributes(
@@ -194,7 +194,7 @@ async def main():
channel = sys.argv[4]
if channel == 'discover':
await list_rfcomm_channels(device, connection)
await list_rfcomm_channels(connection)
return
# Request authentication
@@ -209,7 +209,7 @@ async def main():
# Create a client and start it
print('@@@ Starting RFCOMM client...')
rfcomm_client = Client(device, connection)
rfcomm_client = Client(connection)
rfcomm_mux = await rfcomm_client.start()
print('@@@ Started')