Compare commits

...

14 Commits

Author SHA1 Message Date
Charlie Boutier 1256170985 Pandora: refactor l2cap service
* Craft the PandoraChannel from the connection_handle and the source_cid
* Fix race on waitDisconnection
* Add ChannelContext to enable mutliple channels on the service
2024-09-03 15:52:40 +00:00
zxzxwu 4394a36332 Merge pull request #526 from Gopi-SB/oob
DH Key compute check modification for OOB Pairing
2024-08-29 16:56:45 +08:00
Gopi Sakshihally Bhuthaiah 0c9fd64434 DH Key compute check modification for OOB Pairing 2024-08-29 08:46:53 +00:00
Samad Atoro 2e99153696 Pandora: Add L2CAP service 2024-08-23 16:38:29 -07:00
zxzxwu 54a6f3cb36 Merge pull request #536 from zxzxwu/asha
Refactor ASHA service implementation and examples
2024-08-24 01:19:42 +08:00
Charlie Boutier 4a691c11d4 pyusb: allow to detect multiple usb dongle
Allow to detect multiple usb dongle by just provind the pid/vid
2024-08-23 08:22:43 -07:00
Gilles Boccon-Gibod b114c0d63f Merge pull request #539 from google/gbg/usb-thread-hotfix
hotfix for usb transport
2024-08-22 22:36:24 -07:00
Josh Wu 04311b4c90 Refactor ASHA service and integrate with examples 2024-08-22 12:53:19 +08:00
Gopi Sakshihally Bhuthaiah c44c89cc6e DH Key compute check modification for OOB Pairing 2024-08-13 02:10:41 +00:00
Gopi Sakshihally Bhuthaiah 414f2f3efb DH Key compute check modification for OOB Pairing 2024-08-12 07:00:51 +00:00
Gopi Sakshihally Bhuthaiah ed00d44ae1 DH Key compute check modification for OOB Pairing 2024-08-09 17:30:19 +00:00
Gopi Sakshihally Bhuthaiah b164524380 DH Key compute check modification for OOB Pairing 2024-08-08 10:31:26 +00:00
Gopi Sakshihally Bhuthaiah 29e4a843df DH Key compute check modification for OOB Pairing 2024-08-08 08:48:58 +00:00
Gopi Sakshihally Bhuthaiah 619b32d36e DH Key compute check modification for OOB Pairing 2024-08-08 07:53:05 +00:00
12 changed files with 993 additions and 366 deletions
+14 -10
View File
@@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Union
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
@@ -149,7 +151,7 @@ QMF_COEFFS = [3, -11, 12, 32, -210, 951, 3876, -805, 362, -156, 53, -11]
# -----------------------------------------------------------------------------
# Classes
# -----------------------------------------------------------------------------
class G722Decoder(object):
class G722Decoder:
"""G.722 decoder with bitrate 64kbit/s.
For the Blocks in the sub-band decoders, please refer to the G.722
@@ -157,7 +159,7 @@ class G722Decoder(object):
https://www.itu.int/rec/T-REC-G.722-201209-I
"""
def __init__(self):
def __init__(self) -> None:
self._x = [0] * 24
self._band = [Band(), Band()]
# The initial value in BLOCK 3L
@@ -165,12 +167,12 @@ class G722Decoder(object):
# The initial value in BLOCK 3H
self._band[1].det = 8
def decode_frame(self, encoded_data) -> bytearray:
def decode_frame(self, encoded_data: Union[bytes, bytearray]) -> bytearray:
result_array = bytearray(len(encoded_data) * 4)
self.g722_decode(result_array, encoded_data)
return result_array
def g722_decode(self, result_array, encoded_data) -> int:
def g722_decode(self, result_array, encoded_data: Union[bytes, bytearray]) -> int:
"""Decode the data frame using g722 decoder."""
result_length = 0
@@ -198,14 +200,16 @@ class G722Decoder(object):
return result_length
def update_decoded_result(self, xout, byte_length, byte_array) -> int:
def update_decoded_result(
self, xout: int, byte_length: int, byte_array: bytearray
) -> int:
result = (int)(xout >> 11)
bytes_result = result.to_bytes(2, 'little', signed=True)
byte_array[byte_length] = bytes_result[0]
byte_array[byte_length + 1] = bytes_result[1]
return byte_length + 2
def lower_sub_band_decoder(self, lower_bits) -> int:
def lower_sub_band_decoder(self, lower_bits: int) -> int:
"""Lower sub-band decoder for last six bits."""
# Block 5L
@@ -258,7 +262,7 @@ class G722Decoder(object):
return rlow
def higher_sub_band_decoder(self, higher_bits) -> int:
def higher_sub_band_decoder(self, higher_bits: int) -> int:
"""Higher sub-band decoder for first two bits."""
# Block 2H
@@ -306,14 +310,14 @@ class G722Decoder(object):
# -----------------------------------------------------------------------------
class Band(object):
"""Structure for G722 decode proccessing."""
class Band:
"""Structure for G722 decode processing."""
s: int = 0
nb: int = 0
det: int = 0
def __init__(self):
def __init__(self) -> None:
self._sp = 0
self._sz = 0
self._r = [0] * 3
+3
View File
@@ -25,8 +25,10 @@ import grpc.aio
from .config import Config
from .device import PandoraDevice
from .host import HostService
from .l2cap import L2CAPService
from .security import SecurityService, SecurityStorageService
from pandora.host_grpc_aio import add_HostServicer_to_server
from pandora.l2cap_grpc_aio import add_L2CAPServicer_to_server
from pandora.security_grpc_aio import (
add_SecurityServicer_to_server,
add_SecurityStorageServicer_to_server,
@@ -77,6 +79,7 @@ async def serve(
add_SecurityStorageServicer_to_server(
SecurityStorageService(bumble.device, config), server
)
add_L2CAPServicer_to_server(L2CAPService(bumble.device, config), server)
# call hooks if any.
for hook in _SERVICERS_HOOKS:
+310
View File
@@ -0,0 +1,310 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import asyncio
import grpc
import json
import logging
from asyncio import Queue as AsyncQueue, Future
from . import utils
from .config import Config
from bumble.core import OutOfResourcesError, InvalidArgumentError
from bumble.device import Device
from bumble.l2cap import (
ClassicChannel,
ClassicChannelServer,
ClassicChannelSpec,
LeCreditBasedChannel,
LeCreditBasedChannelServer,
LeCreditBasedChannelSpec,
)
from google.protobuf import any_pb2, empty_pb2 # pytype: disable=pyi-error
from pandora.l2cap_grpc_aio import L2CAPServicer # pytype: disable=pyi-error
from pandora.l2cap_pb2 import ( # pytype: disable=pyi-error
COMMAND_NOT_UNDERSTOOD,
INVALID_CID_IN_REQUEST,
Channel as PandoraChannel,
ConnectRequest,
ConnectResponse,
CreditBasedChannelRequest,
DisconnectRequest,
DisconnectResponse,
ReceiveRequest,
ReceiveResponse,
SendRequest,
SendResponse,
WaitConnectionRequest,
WaitConnectionResponse,
WaitDisconnectionRequest,
WaitDisconnectionResponse,
)
from typing import AsyncGenerator, Dict, Optional, Union
from dataclasses import dataclass
L2capChannel = Union[ClassicChannel, LeCreditBasedChannel]
@dataclass
class ChannelContext:
close_future: Future
sdu_queue: AsyncQueue
class L2CAPService(L2CAPServicer):
def __init__(self, device: Device, config: Config) -> None:
self.log = utils.BumbleServerLoggerAdapter(
logging.getLogger(), {'service_name': 'L2CAP', 'device': device}
)
self.device = device
self.config = config
self.channels: Dict[bytes, ChannelContext] = {}
def register_event(self, l2cap_channel: L2capChannel) -> ChannelContext:
close_future = asyncio.get_running_loop().create_future()
sdu_queue: AsyncQueue = AsyncQueue()
def on_channel_sdu(sdu):
sdu_queue.put_nowait(sdu)
def on_close():
close_future.set_result(None)
l2cap_channel.sink = on_channel_sdu
l2cap_channel.on('close', on_close)
return ChannelContext(close_future, sdu_queue)
@utils.rpc
async def WaitConnection(
self, request: WaitConnectionRequest, context: grpc.ServicerContext
) -> WaitConnectionResponse:
self.log.debug('WaitConnection')
if not request.connection:
raise ValueError('A valid connection field must be set')
# find connection on device based on connection cookie value
connection_handle = int.from_bytes(request.connection.cookie.value, 'big')
connection = self.device.lookup_connection(connection_handle)
if not connection:
raise ValueError('The connection specified is invalid.')
oneof = request.WhichOneof('type')
self.log.debug(f'WaitConnection channel request type: {oneof}.')
channel_type = getattr(request, oneof)
spec: Optional[Union[ClassicChannelSpec, LeCreditBasedChannelSpec]] = None
l2cap_server: Optional[
Union[ClassicChannelServer, LeCreditBasedChannelServer]
] = None
if isinstance(channel_type, CreditBasedChannelRequest):
spec = LeCreditBasedChannelSpec(
psm=channel_type.spsm,
max_credits=channel_type.initial_credit,
mtu=channel_type.mtu,
mps=channel_type.mps,
)
if channel_type.spsm in self.device.l2cap_channel_manager.le_coc_servers:
l2cap_server = self.device.l2cap_channel_manager.le_coc_servers[
channel_type.spsm
]
else:
spec = ClassicChannelSpec(
psm=channel_type.psm,
mtu=channel_type.mtu,
)
if channel_type.psm in self.device.l2cap_channel_manager.servers:
l2cap_server = self.device.l2cap_channel_manager.servers[
channel_type.psm
]
self.log.info(f'Listening for L2CAP connection on PSM {spec.psm}')
channel_future: Future[PandoraChannel] = (
asyncio.get_running_loop().create_future()
)
def on_l2cap_channel(l2cap_channel: L2capChannel):
try:
channel_context = self.register_event(l2cap_channel)
pandora_channel: PandoraChannel = self.craft_pandora_channel(
connection_handle, l2cap_channel
)
self.channels[pandora_channel.cookie.value] = channel_context
channel_future.set_result(pandora_channel)
except Exception as e:
self.log.error(f'Failed to set channel future: {e}')
if l2cap_server is None:
l2cap_server = self.device.create_l2cap_server(
spec=spec, handler=on_l2cap_channel
)
else:
l2cap_server.on('connection', on_l2cap_channel)
try:
self.log.debug('Waiting for a channel connection.')
pandora_channel: PandoraChannel = await channel_future
return WaitConnectionResponse(channel=pandora_channel)
except Exception as e:
self.log.warning(f'Exception: {e}')
return WaitConnectionResponse(error=COMMAND_NOT_UNDERSTOOD)
@utils.rpc
async def WaitDisconnection(
self, request: WaitDisconnectionRequest, context: grpc.ServicerContext
) -> WaitDisconnectionResponse:
try:
self.log.debug('WaitDisconnection')
await self.lookup_context(request.channel).close_future
self.log.debug("return WaitDisconnectionResponse")
return WaitDisconnectionResponse(success=empty_pb2.Empty())
except KeyError as e:
self.log.warning(f'WaitDisconnection: Unable to find the channel: {e}')
return WaitDisconnectionResponse(error=INVALID_CID_IN_REQUEST)
except Exception as e:
self.log.exception(f'WaitDisonnection failed: {e}')
return WaitDisconnectionResponse(error=COMMAND_NOT_UNDERSTOOD)
@utils.rpc
async def Receive(
self, request: ReceiveRequest, context: grpc.ServicerContext
) -> AsyncGenerator[ReceiveResponse, None]:
self.log.debug('Receive')
oneof = request.WhichOneof('source')
self.log.debug(f'Source: {oneof}.')
pandora_channel = getattr(request, oneof)
sdu_queue = self.lookup_context(pandora_channel).sdu_queue
while sdu := await sdu_queue.get():
self.log.debug(f'Receive: Received {len(sdu)} bytes -> {sdu.decode()}')
response = ReceiveResponse(data=sdu)
yield response
@utils.rpc
async def Connect(
self, request: ConnectRequest, context: grpc.ServicerContext
) -> ConnectResponse:
self.log.debug('Connect')
if not request.connection:
raise ValueError('A valid connection field must be set')
# find connection on device based on connection cookie value
connection_handle = int.from_bytes(request.connection.cookie.value, 'big')
connection = self.device.lookup_connection(connection_handle)
if not connection:
raise ValueError('The connection specified is invalid.')
oneof = request.WhichOneof('type')
self.log.debug(f'Channel request type: {oneof}.')
channel_type = getattr(request, oneof)
spec: Optional[Union[ClassicChannelSpec, LeCreditBasedChannelSpec]] = None
if isinstance(channel_type, CreditBasedChannelRequest):
spec = LeCreditBasedChannelSpec(
psm=channel_type.spsm,
max_credits=channel_type.initial_credit,
mtu=channel_type.mtu,
mps=channel_type.mps,
)
else:
spec = ClassicChannelSpec(
psm=channel_type.psm,
mtu=channel_type.mtu,
)
try:
self.log.info(f'Opening L2CAP channel on PSM = {spec.psm}')
l2cap_channel = await connection.create_l2cap_channel(spec=spec)
channel_context = self.register_event(l2cap_channel)
pandora_channel = self.craft_pandora_channel(
connection_handle, l2cap_channel
)
self.channels[pandora_channel.cookie.value] = channel_context
return ConnectResponse(channel=pandora_channel)
except OutOfResourcesError as e:
self.log.error(e)
return ConnectResponse(error=INVALID_CID_IN_REQUEST)
except InvalidArgumentError as e:
self.log.error(e)
return ConnectResponse(error=COMMAND_NOT_UNDERSTOOD)
@utils.rpc
async def Disconnect(
self, request: DisconnectRequest, context: grpc.ServicerContext
) -> DisconnectResponse:
try:
self.log.debug('Disconnect')
l2cap_channel = self.lookup_channel(request.channel)
if not l2cap_channel:
self.log.warning('Disconnect: Unable to find the channel')
return DisconnectResponse(error=INVALID_CID_IN_REQUEST)
await l2cap_channel.disconnect()
return DisconnectResponse(success=empty_pb2.Empty())
except Exception as e:
self.log.exception(f'Disonnect failed: {e}')
return DisconnectResponse(error=COMMAND_NOT_UNDERSTOOD)
@utils.rpc
async def Send(
self, request: SendRequest, context: grpc.ServicerContext
) -> SendResponse:
self.log.debug('Send')
try:
oneof = request.WhichOneof('sink')
self.log.debug(f'Sink: {oneof}.')
pandora_channel = getattr(request, oneof)
l2cap_channel = self.lookup_channel(pandora_channel)
if not l2cap_channel:
return SendResponse(error=COMMAND_NOT_UNDERSTOOD)
if isinstance(l2cap_channel, ClassicChannel):
l2cap_channel.send_pdu(request.data)
else:
l2cap_channel.write(request.data)
return SendResponse(success=empty_pb2.Empty())
except Exception as e:
self.log.exception(f'Disonnect failed: {e}')
return SendResponse(error=COMMAND_NOT_UNDERSTOOD)
def craft_pandora_channel(
self,
connection_handle: int,
l2cap_channel: L2capChannel,
) -> PandoraChannel:
parameters = {
"connection_handle": connection_handle,
"source_cid": l2cap_channel.source_cid,
}
cookie = any_pb2.Any()
cookie.value = json.dumps(parameters).encode()
return PandoraChannel(cookie=cookie)
def lookup_channel(self, pandora_channel: PandoraChannel) -> L2capChannel:
(connection_handle, source_cid) = json.loads(
pandora_channel.cookie.value
).values()
return self.device.l2cap_channel_manager.channels[connection_handle][source_cid]
def lookup_context(self, pandora_channel: PandoraChannel) -> ChannelContext:
return self.channels[pandora_channel.cookie.value]
+295
View File
@@ -0,0 +1,295 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import enum
import struct
import logging
from typing import List, Optional, Callable, Union, Any
from bumble import l2cap
from bumble import utils
from bumble import gatt
from bumble import gatt_client
from bumble.core import AdvertisingData
from bumble.device import Device, Connection
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
_logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
class DeviceCapabilities(enum.IntFlag):
IS_RIGHT = 0x01
IS_DUAL = 0x02
CSIS_SUPPORTED = 0x04
class FeatureMap(enum.IntFlag):
LE_COC_AUDIO_OUTPUT_STREAMING_SUPPORTED = 0x01
class AudioType(utils.OpenIntEnum):
UNKNOWN = 0x00
RINGTONE = 0x01
PHONE_CALL = 0x02
MEDIA = 0x03
class OpCode(utils.OpenIntEnum):
START = 1
STOP = 2
STATUS = 3
class Codec(utils.OpenIntEnum):
G_722_16KHZ = 1
class SupportedCodecs(enum.IntFlag):
G_722_16KHZ = 1 << Codec.G_722_16KHZ
class PeripheralStatus(utils.OpenIntEnum):
"""Status update on the other peripheral."""
OTHER_PERIPHERAL_DISCONNECTED = 1
OTHER_PERIPHERAL_CONNECTED = 2
CONNECTION_PARAMETER_UPDATED = 3
class AudioStatus(utils.OpenIntEnum):
"""Status report field for the audio control point."""
OK = 0
UNKNOWN_COMMAND = -1
ILLEGAL_PARAMETERS = -2
# -----------------------------------------------------------------------------
class AshaService(gatt.TemplateService):
UUID = gatt.GATT_ASHA_SERVICE
audio_sink: Optional[Callable[[bytes], Any]]
active_codec: Optional[Codec] = None
audio_type: Optional[AudioType] = None
volume: Optional[int] = None
other_state: Optional[int] = None
connection: Optional[Connection] = None
def __init__(
self,
capability: int,
hisyncid: Union[List[int], bytes],
device: Device,
psm: int = 0,
audio_sink: Optional[Callable[[bytes], Any]] = None,
feature_map: int = FeatureMap.LE_COC_AUDIO_OUTPUT_STREAMING_SUPPORTED,
protocol_version: int = 0x01,
render_delay_milliseconds: int = 0,
supported_codecs: int = SupportedCodecs.G_722_16KHZ,
) -> None:
if len(hisyncid) != 8:
_logger.warning('HiSyncId should have a length of 8, got %d', len(hisyncid))
self.hisyncid = bytes(hisyncid)
self.capability = capability
self.device = device
self.audio_out_data = b''
self.psm = psm # a non-zero psm is mainly for testing purpose
self.audio_sink = audio_sink
self.protocol_version = protocol_version
self.read_only_properties_characteristic = gatt.Characteristic(
gatt.GATT_ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC,
gatt.Characteristic.Properties.READ,
gatt.Characteristic.READABLE,
struct.pack(
"<BB8sBH2sH",
protocol_version,
capability,
self.hisyncid,
feature_map,
render_delay_milliseconds,
b'\x00\x00',
supported_codecs,
),
)
self.audio_control_point_characteristic = gatt.Characteristic(
gatt.GATT_ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC,
gatt.Characteristic.Properties.WRITE
| gatt.Characteristic.Properties.WRITE_WITHOUT_RESPONSE,
gatt.Characteristic.WRITEABLE,
gatt.CharacteristicValue(write=self._on_audio_control_point_write),
)
self.audio_status_characteristic = gatt.Characteristic(
gatt.GATT_ASHA_AUDIO_STATUS_CHARACTERISTIC,
gatt.Characteristic.Properties.READ | gatt.Characteristic.Properties.NOTIFY,
gatt.Characteristic.READABLE,
bytes([AudioStatus.OK]),
)
self.volume_characteristic = gatt.Characteristic(
gatt.GATT_ASHA_VOLUME_CHARACTERISTIC,
gatt.Characteristic.Properties.WRITE_WITHOUT_RESPONSE,
gatt.Characteristic.WRITEABLE,
gatt.CharacteristicValue(write=self._on_volume_write),
)
# let the server find a free PSM
self.psm = device.create_l2cap_server(
spec=l2cap.LeCreditBasedChannelSpec(psm=self.psm, max_credits=8),
handler=self._on_connection,
).psm
self.le_psm_out_characteristic = gatt.Characteristic(
gatt.GATT_ASHA_LE_PSM_OUT_CHARACTERISTIC,
gatt.Characteristic.Properties.READ,
gatt.Characteristic.READABLE,
struct.pack('<H', self.psm),
)
characteristics = [
self.read_only_properties_characteristic,
self.audio_control_point_characteristic,
self.audio_status_characteristic,
self.volume_characteristic,
self.le_psm_out_characteristic,
]
super().__init__(characteristics)
def get_advertising_data(self) -> bytes:
# Advertisement only uses 4 least significant bytes of the HiSyncId.
return bytes(
AdvertisingData(
[
(
AdvertisingData.SERVICE_DATA_16_BIT_UUID,
bytes(gatt.GATT_ASHA_SERVICE)
+ bytes([self.protocol_version, self.capability])
+ self.hisyncid[:4],
),
]
)
)
# Handler for audio control commands
async def _on_audio_control_point_write(
self, connection: Optional[Connection], value: bytes
) -> None:
_logger.debug(f'--- AUDIO CONTROL POINT Write:{value.hex()}')
opcode = value[0]
if opcode == OpCode.START:
# Start
self.active_codec = Codec(value[1])
self.audio_type = AudioType(value[2])
self.volume = value[3]
self.other_state = value[4]
_logger.debug(
f'### START: codec={self.active_codec.name}, '
f'audio_type={self.audio_type.name}, '
f'volume={self.volume}, '
f'other_state={self.other_state}'
)
self.emit('started')
elif opcode == OpCode.STOP:
_logger.debug('### STOP')
self.active_codec = None
self.audio_type = None
self.volume = None
self.other_state = None
self.emit('stopped')
elif opcode == OpCode.STATUS:
_logger.debug('### STATUS: %s', PeripheralStatus(value[1]).name)
if self.connection is None and connection:
self.connection = connection
def on_disconnection(_reason) -> None:
self.connection = None
self.active_codec = None
self.audio_type = None
self.volume = None
self.other_state = None
self.emit('disconnected')
connection.once('disconnection', on_disconnection)
# OPCODE_STATUS does not need audio status point update
if opcode != OpCode.STATUS:
await self.device.notify_subscribers(
self.audio_status_characteristic, force=True
)
# Handler for volume control
def _on_volume_write(self, connection: Optional[Connection], value: bytes) -> None:
_logger.debug(f'--- VOLUME Write:{value[0]}')
self.volume = value[0]
self.emit('volume_changed')
# Register an L2CAP CoC server
def _on_connection(self, channel: l2cap.LeCreditBasedChannel) -> None:
def on_data(data: bytes) -> None:
if self.audio_sink: # pylint: disable=not-callable
self.audio_sink(data)
channel.sink = on_data
# -----------------------------------------------------------------------------
class AshaServiceProxy(gatt_client.ProfileServiceProxy):
SERVICE_CLASS = AshaService
read_only_properties_characteristic: gatt_client.CharacteristicProxy
audio_control_point_characteristic: gatt_client.CharacteristicProxy
audio_status_point_characteristic: gatt_client.CharacteristicProxy
volume_characteristic: gatt_client.CharacteristicProxy
psm_characteristic: gatt_client.CharacteristicProxy
def __init__(self, service_proxy: gatt_client.ServiceProxy) -> None:
self.service_proxy = service_proxy
for uuid, attribute_name in (
(
gatt.GATT_ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC,
'read_only_properties_characteristic',
),
(
gatt.GATT_ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC,
'audio_control_point_characteristic',
),
(
gatt.GATT_ASHA_AUDIO_STATUS_CHARACTERISTIC,
'audio_status_point_characteristic',
),
(
gatt.GATT_ASHA_VOLUME_CHARACTERISTIC,
'volume_characteristic',
),
(
gatt.GATT_ASHA_LE_PSM_OUT_CHARACTERISTIC,
'psm_characteristic',
),
):
if not (
characteristics := self.service_proxy.get_characteristics_by_uuid(uuid)
):
raise gatt.InvalidServiceError(f"Missing {uuid} Characteristic")
setattr(self, attribute_name, characteristics[0])
-193
View File
@@ -1,193 +0,0 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import struct
import logging
from typing import List, Optional
from bumble import l2cap
from ..core import AdvertisingData
from ..device import Device, Connection
from ..gatt import (
GATT_ASHA_SERVICE,
GATT_ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC,
GATT_ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC,
GATT_ASHA_AUDIO_STATUS_CHARACTERISTIC,
GATT_ASHA_VOLUME_CHARACTERISTIC,
GATT_ASHA_LE_PSM_OUT_CHARACTERISTIC,
TemplateService,
Characteristic,
CharacteristicValue,
)
from ..utils import AsyncRunner
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
class AshaService(TemplateService):
UUID = GATT_ASHA_SERVICE
OPCODE_START = 1
OPCODE_STOP = 2
OPCODE_STATUS = 3
PROTOCOL_VERSION = 0x01
RESERVED_FOR_FUTURE_USE = [00, 00]
FEATURE_MAP = [0x01] # [LE CoC audio output streaming supported]
SUPPORTED_CODEC_ID = [0x02, 0x01] # Codec IDs [G.722 at 16 kHz]
RENDER_DELAY = [00, 00]
def __init__(self, capability: int, hisyncid: List[int], device: Device, psm=0):
self.hisyncid = hisyncid
self.capability = capability # Device Capabilities [Left, Monaural]
self.device = device
self.audio_out_data = b''
self.psm = psm # a non-zero psm is mainly for testing purpose
# Handler for volume control
def on_volume_write(connection, value):
logger.info(f'--- VOLUME Write:{value[0]}')
self.emit('volume', connection, value[0])
# Handler for audio control commands
def on_audio_control_point_write(connection: Optional[Connection], value):
logger.info(f'--- AUDIO CONTROL POINT Write:{value.hex()}')
opcode = value[0]
if opcode == AshaService.OPCODE_START:
# Start
audio_type = ('Unknown', 'Ringtone', 'Phone Call', 'Media')[value[2]]
logger.info(
f'### START: codec={value[1]}, '
f'audio_type={audio_type}, '
f'volume={value[3]}, '
f'otherstate={value[4]}'
)
self.emit(
'start',
connection,
{
'codec': value[1],
'audiotype': value[2],
'volume': value[3],
'otherstate': value[4],
},
)
elif opcode == AshaService.OPCODE_STOP:
logger.info('### STOP')
self.emit('stop', connection)
elif opcode == AshaService.OPCODE_STATUS:
logger.info(f'### STATUS: connected={value[1]}')
# OPCODE_STATUS does not need audio status point update
if opcode != AshaService.OPCODE_STATUS:
AsyncRunner.spawn(
device.notify_subscribers(
self.audio_status_characteristic, force=True
)
)
self.read_only_properties_characteristic = Characteristic(
GATT_ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC,
Characteristic.Properties.READ,
Characteristic.READABLE,
bytes(
[
AshaService.PROTOCOL_VERSION, # Version
self.capability,
]
)
+ bytes(self.hisyncid)
+ bytes(AshaService.FEATURE_MAP)
+ bytes(AshaService.RENDER_DELAY)
+ bytes(AshaService.RESERVED_FOR_FUTURE_USE)
+ bytes(AshaService.SUPPORTED_CODEC_ID),
)
self.audio_control_point_characteristic = Characteristic(
GATT_ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC,
Characteristic.Properties.WRITE
| Characteristic.Properties.WRITE_WITHOUT_RESPONSE,
Characteristic.WRITEABLE,
CharacteristicValue(write=on_audio_control_point_write),
)
self.audio_status_characteristic = Characteristic(
GATT_ASHA_AUDIO_STATUS_CHARACTERISTIC,
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
Characteristic.READABLE,
bytes([0]),
)
self.volume_characteristic = Characteristic(
GATT_ASHA_VOLUME_CHARACTERISTIC,
Characteristic.Properties.WRITE_WITHOUT_RESPONSE,
Characteristic.WRITEABLE,
CharacteristicValue(write=on_volume_write),
)
# Register an L2CAP CoC server
def on_coc(channel):
def on_data(data):
logging.debug(f'<<< data received:{data}')
self.emit('data', channel.connection, data)
self.audio_out_data += data
channel.sink = on_data
# let the server find a free PSM
self.psm = device.create_l2cap_server(
spec=l2cap.LeCreditBasedChannelSpec(psm=self.psm, max_credits=8),
handler=on_coc,
).psm
self.le_psm_out_characteristic = Characteristic(
GATT_ASHA_LE_PSM_OUT_CHARACTERISTIC,
Characteristic.Properties.READ,
Characteristic.READABLE,
struct.pack('<H', self.psm),
)
characteristics = [
self.read_only_properties_characteristic,
self.audio_control_point_characteristic,
self.audio_status_characteristic,
self.volume_characteristic,
self.le_psm_out_characteristic,
]
super().__init__(characteristics)
def get_advertising_data(self):
# Advertisement only uses 4 least significant bytes of the HiSyncId.
return bytes(
AdvertisingData(
[
(
AdvertisingData.SERVICE_DATA_16_BIT_UUID,
bytes(GATT_ASHA_SERVICE)
+ bytes(
[
AshaService.PROTOCOL_VERSION,
self.capability,
]
)
+ bytes(self.hisyncid[:4]),
),
]
)
)
+23 -4
View File
@@ -764,7 +764,9 @@ class Session:
self.peer_io_capability = SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
# OOB
self.oob_data_flag = 0 if pairing_config.oob is None else 1
self.oob_data_flag = (
1 if pairing_config.oob and pairing_config.oob.peer_data else 0
)
# Set up addresses
self_address = connection.self_resolvable_address or connection.self_address
@@ -1014,8 +1016,10 @@ class Session:
self.send_command(response)
def send_pairing_confirm_command(self) -> None:
self.r = crypto.r()
logger.debug(f'generated random: {self.r.hex()}')
if self.pairing_method != PairingMethod.OOB:
self.r = crypto.r()
logger.debug(f'generated random: {self.r.hex()}')
if self.sc:
@@ -1735,7 +1739,6 @@ class Session:
if self.pairing_method in (
PairingMethod.JUST_WORKS,
PairingMethod.NUMERIC_COMPARISON,
PairingMethod.OOB,
):
ra = bytes(16)
rb = ra
@@ -1743,6 +1746,22 @@ class Session:
assert self.passkey
ra = self.passkey.to_bytes(16, byteorder='little')
rb = ra
elif self.pairing_method == PairingMethod.OOB:
if self.is_initiator:
if self.peer_oob_data:
rb = self.peer_oob_data.r
ra = self.r
else:
rb = bytes(16)
ra = self.r
else:
if self.peer_oob_data:
ra = self.peer_oob_data.r
rb = self.r
else:
ra = bytes(16)
rb = self.r
else:
return
+19 -2
View File
@@ -23,7 +23,7 @@ import time
import usb.core
import usb.util
from typing import Optional
from typing import Optional, Set
from usb.core import Device as UsbDevice
from usb.core import USBError
from usb.util import CTRL_TYPE_CLASS, CTRL_RECIPIENT_OTHER
@@ -46,6 +46,11 @@ RESET_DELAY = 3
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Global
# -----------------------------------------------------------------------------
devices_in_use: Set[int] = set()
# -----------------------------------------------------------------------------
async def open_pyusb_transport(spec: str) -> Transport:
@@ -216,6 +221,7 @@ async def open_pyusb_transport(spec: str) -> Transport:
async def close(self):
await self.source.stop()
await self.sink.stop()
devices_in_use.remove(device.address)
usb.util.release_interface(self.device, 0)
usb_find = usb.core.find
@@ -233,7 +239,18 @@ async def open_pyusb_transport(spec: str) -> Transport:
spec = spec[1:]
if ':' in spec:
vendor_id, product_id = spec.split(':')
device = usb_find(idVendor=int(vendor_id, 16), idProduct=int(product_id, 16))
device = None
devices = usb_find(
find_all=True, idVendor=int(vendor_id, 16), idProduct=int(product_id, 16)
)
for d in devices:
if d.address in devices_in_use:
continue
device = d
devices_in_use.add(d.address)
break
if device is None:
raise ValueError('device already in use')
elif '-' in spec:
def device_path(device):
+95
View File
@@ -0,0 +1,95 @@
<html data-bs-theme="dark">
<head>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.2/dist/css/bootstrap.min.css" rel="stylesheet"
integrity="sha384-T3c6CoIi6uLrA9TneNEoa7RxnatzjcDSCmG1MXxSR1GAsXEV/Dwwykc2MPK8M2HN" crossorigin="anonymous">
<script src="https://unpkg.com/pcm-player"></script>
</head>
<body>
<nav class="navbar navbar-dark bg-primary">
<div class="container">
<span class="navbar-brand mb-0 h1">Bumble ASHA Sink</span>
</div>
</nav>
<br>
<div class="container">
<div class="row">
<div class="col-auto">
<button id="connect-audio" class="btn btn-danger" onclick="connectAudio()">Connect Audio</button>
</div>
</div>
<hr>
<div class="row">
<div class="col-4">
<label class="form-label">Browser Gain</label>
<input type="range" class="form-range" id="browser-gain" min="0" max="2" value="1" step="0.1"
onchange="setGain()">
</div>
</div>
<hr>
<div id="socketStateContainer" class="bg-body-tertiary p-3 rounded-2">
<h3>Log</h3>
<code id="log" style="white-space: pre-line;"></code>
</div>
</div>
<script>
let atResponseInput = document.getElementById("at_response")
let gainInput = document.getElementById('browser-gain')
let log = document.getElementById("log")
let socket = new WebSocket('ws://localhost:8888');
let sampleRate = 0;
let player;
socket.binaryType = "arraybuffer";
socket.onopen = _ => {
log.textContent += 'SOCKET OPEN\n'
}
socket.onclose = _ => {
log.textContent += 'SOCKET CLOSED\n'
}
socket.onerror = (error) => {
log.textContent += 'SOCKET ERROR\n'
console.log(`ERROR: ${error}`)
}
socket.onmessage = function (message) {
if (typeof message.data === 'string' || message.data instanceof String) {
log.textContent += `<-- ${event.data}\n`
} else {
// BINARY audio data.
if (player == null) return;
player.feed(message.data);
}
};
function connectAudio() {
player = new PCMPlayer({
inputCodec: 'Int16',
channels: 1,
sampleRate: 16000,
flushTime: 20,
});
player.volume(gainInput.value);
const button = document.getElementById("connect-audio")
button.disabled = true;
button.textContent = "Audio Connected";
}
function setGain() {
if (player != null) {
player.volume(gainInput.value);
}
}
</script>
</div>
</body>
</html>
+2 -1
View File
@@ -1,5 +1,6 @@
{
"name": "Bumble Aid Left",
"address": "F1:F2:F3:F4:F5:F6",
"identity_address_type": 1,
"keystore": "JsonKeyStore"
}
}
+2 -1
View File
@@ -1,5 +1,6 @@
{
"name": "Bumble Aid Right",
"address": "F7:F8:F9:FA:FB:FC",
"identity_address_type": 1,
"keystore": "JsonKeyStore"
}
}
+67 -155
View File
@@ -16,192 +16,104 @@
# Imports
# -----------------------------------------------------------------------------
import asyncio
import struct
import sys
import os
import logging
import websockets
from bumble import l2cap
from typing import Optional
from bumble import decoder
from bumble import gatt
from bumble.core import AdvertisingData
from bumble.device import Device
from bumble.device import Device, AdvertisingParameters
from bumble.transport import open_transport_or_link
from bumble.core import UUID
from bumble.gatt import Service, Characteristic, CharacteristicValue
from bumble.profiles import asha
ws_connection: Optional[websockets.WebSocketServerProtocol] = None
g722_decoder = decoder.G722Decoder()
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
ASHA_SERVICE = UUID.from_16_bits(0xFDF0, 'Audio Streaming for Hearing Aid')
ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC = UUID(
'6333651e-c481-4a3e-9169-7c902aad37bb', 'ReadOnlyProperties'
)
ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC = UUID(
'f0d4de7e-4a88-476c-9d9f-1937b0996cc0', 'AudioControlPoint'
)
ASHA_AUDIO_STATUS_CHARACTERISTIC = UUID(
'38663f1a-e711-4cac-b641-326b56404837', 'AudioStatus'
)
ASHA_VOLUME_CHARACTERISTIC = UUID('00e4ca9e-ab14-41e4-8823-f9e70c7e91df', 'Volume')
ASHA_LE_PSM_OUT_CHARACTERISTIC = UUID(
'2d410339-82b6-42aa-b34e-e2e01df8cc1a', 'LE_PSM_OUT'
)
async def ws_server(ws_client: websockets.WebSocketServerProtocol, path: str):
del path
global ws_connection
ws_connection = ws_client
async for message in ws_client:
print(message)
# -----------------------------------------------------------------------------
async def main() -> None:
if len(sys.argv) != 4:
print(
'Usage: python run_asha_sink.py <device-config> <transport-spec> '
'<audio-file>'
)
print('example: python run_asha_sink.py device1.json usb:0 audio_out.g722')
if len(sys.argv) != 3:
print('Usage: python run_asha_sink.py <device-config> <transport-spec>')
print('example: python run_asha_sink.py device1.json usb:0')
return
audio_out = open(sys.argv[3], 'wb')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
# Handler for audio control commands
def on_audio_control_point_write(_connection, value):
print('--- AUDIO CONTROL POINT Write:', value.hex())
opcode = value[0]
if opcode == 1:
# Start
audio_type = ('Unknown', 'Ringtone', 'Phone Call', 'Media')[value[2]]
print(
f'### START: codec={value[1]}, audio_type={audio_type}, '
f'volume={value[3]}, otherstate={value[4]}'
)
elif opcode == 2:
print('### STOP')
elif opcode == 3:
print(f'### STATUS: connected={value[1]}')
def on_audio_packet(packet: bytes) -> None:
global ws_connection
if ws_connection:
offset = 1
while offset < len(packet):
pcm_data = g722_decoder.decode_frame(packet[offset : offset + 80])
offset += 80
asyncio.get_running_loop().create_task(ws_connection.send(pcm_data))
else:
logging.info("No active client")
# Respond with a status
asyncio.create_task(
device.notify_subscribers(audio_status_characteristic, force=True)
)
# Handler for volume control
def on_volume_write(_connection, value):
print('--- VOLUME Write:', value[0])
# Register an L2CAP CoC server
def on_coc(channel):
def on_data(data):
print('<<< Voice data received:', data.hex())
audio_out.write(data)
channel.sink = on_data
server = device.create_l2cap_server(
spec=l2cap.LeCreditBasedChannelSpec(max_credits=8), handler=on_coc
)
print(f'### LE_PSM_OUT = {server.psm}')
# Add the ASHA service to the GATT server
read_only_properties_characteristic = Characteristic(
ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC,
Characteristic.Properties.READ,
Characteristic.READABLE,
bytes(
[
0x01, # Version
0x00, # Device Capabilities [Left, Monaural]
0x01,
0x02,
0x03,
0x04,
0x05,
0x06,
0x07,
0x08, # HiSyncId
0x01, # Feature Map [LE CoC audio output streaming supported]
0x00,
0x00, # Render Delay
0x00,
0x00, # RFU
0x02,
0x00, # Codec IDs [G.722 at 16 kHz]
]
),
)
audio_control_point_characteristic = Characteristic(
ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC,
Characteristic.Properties.WRITE | Characteristic.WRITE_WITHOUT_RESPONSE,
Characteristic.WRITEABLE,
CharacteristicValue(write=on_audio_control_point_write),
)
audio_status_characteristic = Characteristic(
ASHA_AUDIO_STATUS_CHARACTERISTIC,
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
Characteristic.READABLE,
bytes([0]),
)
volume_characteristic = Characteristic(
ASHA_VOLUME_CHARACTERISTIC,
Characteristic.WRITE_WITHOUT_RESPONSE,
Characteristic.WRITEABLE,
CharacteristicValue(write=on_volume_write),
)
le_psm_out_characteristic = Characteristic(
ASHA_LE_PSM_OUT_CHARACTERISTIC,
Characteristic.Properties.READ,
Characteristic.READABLE,
struct.pack('<H', server.psm),
)
device.add_service(
Service(
ASHA_SERVICE,
[
read_only_properties_characteristic,
audio_control_point_characteristic,
audio_status_characteristic,
volume_characteristic,
le_psm_out_characteristic,
],
)
asha_service = asha.AshaService(
capability=0,
hisyncid=b'\x01\x02\x03\x04\x05\x06\x07\x08',
device=device,
audio_sink=on_audio_packet,
)
device.add_service(asha_service)
# Set the advertising data
device.advertising_data = bytes(
AdvertisingData(
[
(AdvertisingData.COMPLETE_LOCAL_NAME, bytes(device.name, 'utf-8')),
(AdvertisingData.FLAGS, bytes([0x06])),
(
AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
bytes(ASHA_SERVICE),
),
(
AdvertisingData.SERVICE_DATA_16_BIT_UUID,
bytes(ASHA_SERVICE)
+ bytes(
[
0x01, # Protocol Version
0x00, # Capability
0x01,
0x02,
0x03,
0x04, # Truncated HiSyncID
]
advertising_data = (
bytes(
AdvertisingData(
[
(
AdvertisingData.COMPLETE_LOCAL_NAME,
bytes(device.name, 'utf-8'),
),
),
]
(AdvertisingData.FLAGS, bytes([0x06])),
(
AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
bytes(gatt.GATT_ASHA_SERVICE),
),
]
)
)
+ asha_service.get_advertising_data()
)
# Go!
await device.power_on()
await device.start_advertising(auto_restart=True)
await device.create_advertising_set(
auto_restart=True,
advertising_data=advertising_data,
advertising_parameters=AdvertisingParameters(
primary_advertising_interval_min=100,
primary_advertising_interval_max=100,
),
)
await hci_transport.source.wait_for_termination()
await websockets.serve(ws_server, port=8888)
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
logging.basicConfig(
level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper(),
format='%(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
)
asyncio.run(main())
+163
View File
@@ -0,0 +1,163 @@
# Copyright 2021-2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import pytest
import struct
from unittest import mock
from bumble import device as bumble_device
from bumble.profiles import asha
from .test_utils import TwoDevices
# -----------------------------------------------------------------------------
HI_SYNC_ID = b'\x00\x01\x02\x03\x04\x05\x06\x07'
TIMEOUT = 0.1
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_only_properties():
devices = TwoDevices()
await devices.setup_connection()
asha_service = asha.AshaService(
hisyncid=HI_SYNC_ID,
device=devices[0],
protocol_version=0x01,
capability=0x02,
feature_map=0x03,
render_delay_milliseconds=0x04,
supported_codecs=0x05,
)
devices[0].add_service(asha_service)
async with bumble_device.Peer(devices.connections[1]) as peer:
asha_client = peer.create_service_proxy(asha.AshaServiceProxy)
assert asha_client
read_only_properties = (
await asha_client.read_only_properties_characteristic.read_value()
)
(
protocol_version,
capabilities,
hi_sync_id,
feature_map,
render_delay_milliseconds,
_,
supported_codecs,
) = struct.unpack("<BB8sBHHH", read_only_properties)
assert protocol_version == 0x01
assert capabilities == 0x02
assert hi_sync_id == HI_SYNC_ID
assert feature_map == 0x03
assert render_delay_milliseconds == 0x04
assert supported_codecs == 0x05
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_psm():
devices = TwoDevices()
await devices.setup_connection()
asha_service = asha.AshaService(
hisyncid=HI_SYNC_ID,
device=devices[0],
capability=0,
)
devices[0].add_service(asha_service)
async with bumble_device.Peer(devices.connections[1]) as peer:
asha_client = peer.create_service_proxy(asha.AshaServiceProxy)
assert asha_client
psm = (await asha_client.psm_characteristic.read_value())[0]
assert psm == asha_service.psm
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_write_audio_control_point_start():
devices = TwoDevices()
await devices.setup_connection()
asha_service = asha.AshaService(
hisyncid=HI_SYNC_ID,
device=devices[0],
capability=0,
)
devices[0].add_service(asha_service)
async with bumble_device.Peer(devices.connections[1]) as peer:
asha_client = peer.create_service_proxy(asha.AshaServiceProxy)
assert asha_client
status_notifications = asyncio.Queue()
await asha_client.audio_status_point_characteristic.subscribe(
status_notifications.put_nowait
)
start_cb = mock.MagicMock()
asha_service.on('started', start_cb)
await asha_client.audio_control_point_characteristic.write_value(
bytes(
[asha.OpCode.START, asha.Codec.G_722_16KHZ, asha.AudioType.MEDIA, 0, 1]
)
)
status = (await asyncio.wait_for(status_notifications.get(), TIMEOUT))[0]
assert status == asha.AudioStatus.OK
start_cb.assert_called_once()
assert asha_service.active_codec == asha.Codec.G_722_16KHZ
assert asha_service.volume == 0
assert asha_service.other_state == 1
assert asha_service.audio_type == asha.AudioType.MEDIA
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_write_audio_control_point_stop():
devices = TwoDevices()
await devices.setup_connection()
asha_service = asha.AshaService(
hisyncid=HI_SYNC_ID,
device=devices[0],
capability=0,
)
devices[0].add_service(asha_service)
async with bumble_device.Peer(devices.connections[1]) as peer:
asha_client = peer.create_service_proxy(asha.AshaServiceProxy)
assert asha_client
status_notifications = asyncio.Queue()
await asha_client.audio_status_point_characteristic.subscribe(
status_notifications.put_nowait
)
stop_cb = mock.MagicMock()
asha_service.on('stopped', stop_cb)
await asha_client.audio_control_point_characteristic.write_value(
bytes([asha.OpCode.STOP])
)
status = (await asyncio.wait_for(status_notifications.get(), TIMEOUT))[0]
assert status == asha.AudioStatus.OK
stop_cb.assert_called_once()
assert asha_service.active_codec is None
assert asha_service.volume is None
assert asha_service.other_state is None
assert asha_service.audio_type is None