# Copyright 2021-2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- import asyncio import logging import os import pytest from bumble.controller import Controller from bumble.link import LocalLink from bumble.device import Device from bumble.host import Host from bumble.transport import AsyncPipeSink from bumble.avdtp import ( AVDTP_IDLE_STATE, AVDTP_STREAMING_STATE, MediaPacketPump, Protocol, Listener, MediaCodecCapabilities, MediaPacket, AVDTP_AUDIO_MEDIA_TYPE, AVDTP_TSEP_SNK, A2DP_SBC_CODEC_TYPE, ) from bumble.a2dp import ( SbcMediaCodecInformation, SBC_MONO_CHANNEL_MODE, SBC_DUAL_CHANNEL_MODE, SBC_STEREO_CHANNEL_MODE, SBC_JOINT_STEREO_CHANNEL_MODE, SBC_LOUDNESS_ALLOCATION_METHOD, SBC_SNR_ALLOCATION_METHOD, ) # ----------------------------------------------------------------------------- # Logging # ----------------------------------------------------------------------------- logger = logging.getLogger(__name__) # ----------------------------------------------------------------------------- class TwoDevices: def __init__(self): self.connections = [None, None] self.link = LocalLink() self.controllers = [ Controller('C1', link=self.link), Controller('C2', link=self.link), ] self.devices = [ Device( address='F0:F1:F2:F3:F4:F5', host=Host(self.controllers[0], AsyncPipeSink(self.controllers[0])), ), Device( address='F5:F4:F3:F2:F1:F0', host=Host(self.controllers[1], AsyncPipeSink(self.controllers[1])), ), ] self.paired = [None, None] def on_connection(self, which, connection): self.connections[which] = connection # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_self_connection(): # Create two devices, each with a controller, attached to the same link two_devices = TwoDevices() # Attach listeners two_devices.devices[0].on( 'connection', lambda connection: two_devices.on_connection(0, connection) ) two_devices.devices[1].on( 'connection', lambda connection: two_devices.on_connection(1, connection) ) # Start await two_devices.devices[0].power_on() await two_devices.devices[1].power_on() # Connect the two devices await two_devices.devices[0].connect(two_devices.devices[1].random_address) # Check the post conditions assert two_devices.connections[0] is not None assert two_devices.connections[1] is not None # ----------------------------------------------------------------------------- def source_codec_capabilities(): return MediaCodecCapabilities( media_type=AVDTP_AUDIO_MEDIA_TYPE, media_codec_type=A2DP_SBC_CODEC_TYPE, media_codec_information=SbcMediaCodecInformation.from_discrete_values( sampling_frequency=44100, channel_mode=SBC_JOINT_STEREO_CHANNEL_MODE, block_length=16, subbands=8, allocation_method=SBC_LOUDNESS_ALLOCATION_METHOD, minimum_bitpool_value=2, maximum_bitpool_value=53, ), ) # ----------------------------------------------------------------------------- def sink_codec_capabilities(): return MediaCodecCapabilities( media_type=AVDTP_AUDIO_MEDIA_TYPE, media_codec_type=A2DP_SBC_CODEC_TYPE, media_codec_information=SbcMediaCodecInformation.from_lists( sampling_frequencies=[48000, 44100, 32000, 16000], channel_modes=[ SBC_MONO_CHANNEL_MODE, SBC_DUAL_CHANNEL_MODE, SBC_STEREO_CHANNEL_MODE, SBC_JOINT_STEREO_CHANNEL_MODE, ], block_lengths=[4, 8, 12, 16], subbands=[4, 8], allocation_methods=[ SBC_LOUDNESS_ALLOCATION_METHOD, SBC_SNR_ALLOCATION_METHOD, ], minimum_bitpool_value=2, maximum_bitpool_value=53, ), ) # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_source_sink_1(): two_devices = TwoDevices() await two_devices.devices[0].power_on() await two_devices.devices[1].power_on() def on_rtp_packet(packet): rtp_packets.append(packet) if len(rtp_packets) == rtp_packets_expected: rtp_packets_fully_received.set_result(None) sink = None def on_avdtp_connection(server): nonlocal sink sink = server.add_sink(sink_codec_capabilities()) sink.on('rtp_packet', on_rtp_packet) # Create a listener to wait for AVDTP connections listener = Listener(Listener.create_registrar(two_devices.devices[1])) listener.on('connection', on_avdtp_connection) connection = await two_devices.devices[0].connect( two_devices.devices[1].random_address ) client = await Protocol.connect(connection) endpoints = await client.discover_remote_endpoints() assert len(endpoints) == 1 remote_sink = list(endpoints)[0] assert remote_sink.in_use == 0 assert remote_sink.media_type == AVDTP_AUDIO_MEDIA_TYPE assert remote_sink.tsep == AVDTP_TSEP_SNK async def generate_packets(packet_count): sequence_number = 0 timestamp = 0 for i in range(packet_count): payload = bytes([sequence_number % 256]) packet = MediaPacket( 2, 0, 0, 0, sequence_number, timestamp, 0, [], 96, payload ) packet.timestamp_seconds = timestamp / 44100 timestamp += 10 sequence_number += 1 yield packet # Send packets using a pump object rtp_packets_fully_received = asyncio.get_running_loop().create_future() rtp_packets_expected = 3 rtp_packets = [] pump = MediaPacketPump(generate_packets(3)) source = client.add_source(source_codec_capabilities(), pump) stream = await client.create_stream(source, remote_sink) await stream.start() assert stream.state == AVDTP_STREAMING_STATE assert stream.local_endpoint.in_use == 1 assert stream.rtp_channel is not None assert sink.in_use == 1 assert sink.stream is not None assert sink.stream.state == AVDTP_STREAMING_STATE await rtp_packets_fully_received await stream.close() assert stream.rtp_channel is None assert source.in_use == 0 assert source.stream.state == AVDTP_IDLE_STATE assert sink.in_use == 0 assert sink.stream.state == AVDTP_IDLE_STATE # Send packets manually rtp_packets_fully_received = asyncio.get_running_loop().create_future() rtp_packets_expected = 3 rtp_packets = [] source_packets = [ MediaPacket(2, 0, 0, 0, i, i * 10, 0, [], 96, bytes([i])) for i in range(3) ] source = client.add_source(source_codec_capabilities(), None) stream = await client.create_stream(source, remote_sink) await stream.start() assert stream.state == AVDTP_STREAMING_STATE assert stream.local_endpoint.in_use == 1 assert stream.rtp_channel is not None assert sink.in_use == 1 assert sink.stream is not None assert sink.stream.state == AVDTP_STREAMING_STATE stream.send_media_packet(source_packets[0]) stream.send_media_packet(source_packets[1]) stream.send_media_packet(source_packets[2]) await stream.close() assert stream.rtp_channel is None assert len(rtp_packets) == 3 assert source.in_use == 0 assert source.stream.state == AVDTP_IDLE_STATE assert sink.in_use == 0 assert sink.stream.state == AVDTP_IDLE_STATE # ----------------------------------------------------------------------------- async def run_test_self(): await test_self_connection() await test_source_sink_1() # ----------------------------------------------------------------------------- if __name__ == '__main__': logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) asyncio.run(run_test_self())