Merge pull request #442 from zxzxwu/unicast_ad

Implement Unicast Server Advertising Data
This commit is contained in:
zxzxwu
2024-03-27 16:54:06 +08:00
committed by GitHub
3 changed files with 44 additions and 4 deletions

View File

@@ -24,8 +24,9 @@ import enum
import struct import struct
import functools import functools
import logging import logging
from typing import Optional, List, Union, Type, Dict, Any, Tuple, cast from typing import Optional, List, Union, Type, Dict, Any, Tuple
from bumble import core
from bumble import colors from bumble import colors
from bumble import device from bumble import device
from bumble import hci from bumble import hci
@@ -228,6 +229,14 @@ class SupportedFrameDuration(enum.IntFlag):
DURATION_10000_US_PREFERRED = 0b0010 DURATION_10000_US_PREFERRED = 0b0010
class AnnouncementType(enum.IntEnum):
'''Basic Audio Profile, 3.5.3. Additional Audio Stream Control Service requirements'''
# fmt: off
GENERAL = 0x00
TARGETED = 0x01
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# ASE Operations # ASE Operations
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -453,6 +462,34 @@ class AudioRole(enum.IntEnum):
SOURCE = hci.HCI_LE_Setup_ISO_Data_Path_Command.Direction.HOST_TO_CONTROLLER SOURCE = hci.HCI_LE_Setup_ISO_Data_Path_Command.Direction.HOST_TO_CONTROLLER
@dataclasses.dataclass
class UnicastServerAdvertisingData:
"""Advertising Data for ASCS."""
announcement_type: AnnouncementType = AnnouncementType.TARGETED
available_audio_contexts: ContextType = ContextType.MEDIA
metadata: bytes = b''
def __bytes__(self) -> bytes:
return bytes(
core.AdvertisingData(
[
(
core.AdvertisingData.SERVICE_DATA_16_BIT_UUID,
struct.pack(
'<2sBIB',
gatt.GATT_AUDIO_STREAM_CONTROL_SERVICE.to_bytes(),
self.announcement_type,
self.available_audio_contexts,
len(self.metadata),
)
+ self.metadata,
)
]
)
)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Utils # Utils
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -22,14 +22,14 @@ import os
import struct import struct
import secrets import secrets
from bumble.core import AdvertisingData from bumble.core import AdvertisingData
from bumble.device import Device, CisLink, AdvertisingParameters from bumble.device import Device, CisLink
from bumble.hci import ( from bumble.hci import (
CodecID, CodecID,
CodingFormat, CodingFormat,
OwnAddressType,
HCI_IsoDataPacket, HCI_IsoDataPacket,
) )
from bumble.profiles.bap import ( from bumble.profiles.bap import (
UnicastServerAdvertisingData,
CodecSpecificCapabilities, CodecSpecificCapabilities,
ContextType, ContextType,
AudioLocation, AudioLocation,
@@ -141,6 +141,7 @@ async def main() -> None:
) )
) )
+ csis.get_advertising_data() + csis.get_advertising_data()
+ bytes(UnicastServerAdvertisingData())
) )
subprocess = await asyncio.create_subprocess_shell( subprocess = await asyncio.create_subprocess_shell(
f'dlc3 | ffplay pipe:0', f'dlc3 | ffplay pipe:0',
@@ -178,7 +179,7 @@ async def main() -> None:
device.once('cis_establishment', on_cis) device.once('cis_establishment', on_cis)
advertising_set = await device.create_advertising_set( await device.create_advertising_set(
advertising_data=advertising_data, advertising_data=advertising_data,
) )

View File

@@ -31,6 +31,7 @@ from bumble.hci import (
OwnAddressType, OwnAddressType,
) )
from bumble.profiles.bap import ( from bumble.profiles.bap import (
UnicastServerAdvertisingData,
CodecSpecificCapabilities, CodecSpecificCapabilities,
ContextType, ContextType,
AudioLocation, AudioLocation,
@@ -151,6 +152,7 @@ async def main() -> None:
) )
) )
+ csis.get_advertising_data() + csis.get_advertising_data()
+ bytes(UnicastServerAdvertisingData())
) )
await device.create_advertising_set( await device.create_advertising_set(