diff --git a/apps/speaker/__init__.py b/apps/speaker/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/apps/speaker/logo.svg b/apps/speaker/logo.svg new file mode 100644 index 00000000..70ef7a90 --- /dev/null +++ b/apps/speaker/logo.svg @@ -0,0 +1,42 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/apps/speaker/speaker.css b/apps/speaker/speaker.css new file mode 100644 index 00000000..075068b0 --- /dev/null +++ b/apps/speaker/speaker.css @@ -0,0 +1,76 @@ +body, h1, h2, h3, h4, h5, h6 { + font-family: sans-serif; +} + +#controlsDiv { + margin: 6px; +} + +#connectionText { + background-color: rgb(239, 89, 75); + border: none; + border-radius: 4px; + padding: 8px; + display: inline-block; + margin: 4px; +} + +#startButton { + padding: 4px; + margin: 6px; +} + +#fftCanvas { + border-radius: 16px; + margin: 6px; +} + +#bandwidthCanvas { + border: grey; + border-style: solid; + border-radius: 8px; + margin: 6px; +} + +#streamStateText { + background-color: rgb(93, 165, 93); + border: none; + border-radius: 8px; + padding: 10px 20px; + display: inline-block; + margin: 6px; +} + +#connectionStateText { + background-color: rgb(112, 146, 206); + border: none; + border-radius: 8px; + padding: 10px 20px; + display: inline-block; + margin: 6px; +} + +#propertiesTable { + border: grey; + border-style: solid; + border-radius: 4px; + padding: 4px; + margin: 6px; + margin-left: 0px; +} + +th, td { + padding-left: 6px; + padding-right: 6px; +} + +.properties td:nth-child(even) { + background-color: #D6EEEE; + font-family: monospace; +} + +.properties td:nth-child(odd) { + font-weight: bold; +} + +.properties tr td:nth-child(2) { width: 150px; } \ No newline at end of file diff --git a/apps/speaker/speaker.html b/apps/speaker/speaker.html new file mode 100644 index 00000000..f68abccb --- /dev/null +++ b/apps/speaker/speaker.html @@ -0,0 +1,34 @@ + + + + Bumble Speaker + + + + +

Bumble Virtual Speaker

+
+
+ + + +
+ + + + +
Codec
Packets
Bytes
+
+ Bandwidth Graph +
+ IDLE + NOT CONNECTED +
+ + +
+ Audio Frequencies Animation + +
+ + \ No newline at end of file diff --git a/apps/speaker/speaker.js b/apps/speaker/speaker.js new file mode 100644 index 00000000..77cb1ff3 --- /dev/null +++ b/apps/speaker/speaker.js @@ -0,0 +1,315 @@ +(function () { + 'use strict'; + +const channelUrl = ((window.location.protocol === "https:") ? "wss://" : "ws://") + window.location.host + "/channel"; +let channelSocket; +let connectionText; +let codecText; +let packetsReceivedText; +let bytesReceivedText; +let streamStateText; +let connectionStateText; +let controlsDiv; +let audioOnButton; +let mediaSource; +let sourceBuffer; +let audioElement; +let audioContext; +let audioAnalyzer; +let audioFrequencyBinCount; +let audioFrequencyData; +let packetsReceived = 0; +let bytesReceived = 0; +let audioState = "stopped"; +let streamState = "IDLE"; +let audioSupportMessageText; +let fftCanvas; +let fftCanvasContext; +let bandwidthCanvas; +let bandwidthCanvasContext; +let bandwidthBinCount; +let bandwidthBins = []; + +const FFT_WIDTH = 800; +const FFT_HEIGHT = 256; +const BANDWIDTH_WIDTH = 500; +const BANDWIDTH_HEIGHT = 100; + +function hexToBytes(hex) { + return Uint8Array.from(hex.match(/.{1,2}/g).map((byte) => parseInt(byte, 16))); +} + +function init() { + initUI(); + initMediaSource(); + initAudioElement(); + initAnalyzer(); + + connect(); +} + +function initUI() { + controlsDiv = document.getElementById("controlsDiv"); + controlsDiv.style.visibility = "hidden"; + connectionText = document.getElementById("connectionText"); + audioOnButton = document.getElementById("audioOnButton"); + codecText = document.getElementById("codecText"); + packetsReceivedText = document.getElementById("packetsReceivedText"); + bytesReceivedText = document.getElementById("bytesReceivedText"); + streamStateText = document.getElementById("streamStateText"); + connectionStateText = document.getElementById("connectionStateText"); + audioSupportMessageText = document.getElementById("audioSupportMessageText"); + + audioOnButton.onclick = () => startAudio(); + + setConnectionText(""); + + requestAnimationFrame(onAnimationFrame); +} + +function initMediaSource() { + mediaSource = new MediaSource(); + mediaSource.onsourceopen = onMediaSourceOpen; + mediaSource.onsourceclose = onMediaSourceClose; + mediaSource.onsourceended = onMediaSourceEnd; +} + +function initAudioElement() { + audioElement = document.getElementById("audio"); + audioElement.src = URL.createObjectURL(mediaSource); + // audioElement.controls = true; +} + +function initAnalyzer() { + fftCanvas = document.getElementById("fftCanvas"); + fftCanvas.width = FFT_WIDTH + fftCanvas.height = FFT_HEIGHT + fftCanvasContext = fftCanvas.getContext('2d'); + fftCanvasContext.fillStyle = "rgb(0, 0, 0)"; + fftCanvasContext.fillRect(0, 0, FFT_WIDTH, FFT_HEIGHT); + + bandwidthCanvas = document.getElementById("bandwidthCanvas"); + bandwidthCanvas.width = BANDWIDTH_WIDTH + bandwidthCanvas.height = BANDWIDTH_HEIGHT + bandwidthCanvasContext = bandwidthCanvas.getContext('2d'); + bandwidthCanvasContext.fillStyle = "rgb(255, 255, 255)"; + bandwidthCanvasContext.fillRect(0, 0, BANDWIDTH_WIDTH, BANDWIDTH_HEIGHT); +} + +function startAnalyzer() { + // FFT + if (audioElement.captureStream !== undefined) { + audioContext = new AudioContext(); + audioAnalyzer = audioContext.createAnalyser(); + audioAnalyzer.fftSize = 128; + audioFrequencyBinCount = audioAnalyzer.frequencyBinCount; + audioFrequencyData = new Uint8Array(audioFrequencyBinCount); + const stream = audioElement.captureStream(); + const source = audioContext.createMediaStreamSource(stream); + source.connect(audioAnalyzer); + } + + // Bandwidth + bandwidthBinCount = BANDWIDTH_WIDTH / 2; + bandwidthBins = []; +} + +function setConnectionText(message) { + connectionText.innerText = message; + if (message.length == 0) { + connectionText.style.display = "none"; + } else { + connectionText.style.display = "inline-block"; + } +} + +function setStreamState(state) { + streamState = state; + streamStateText.innerText = streamState; +} + +function onAnimationFrame() { + // FFT + if (audioAnalyzer !== undefined) { + audioAnalyzer.getByteFrequencyData(audioFrequencyData); + fftCanvasContext.fillStyle = "rgb(0, 0, 0)"; + fftCanvasContext.fillRect(0, 0, FFT_WIDTH, FFT_HEIGHT); + const barCount = audioFrequencyBinCount; + const barWidth = (FFT_WIDTH / audioFrequencyBinCount) - 1; + for (let bar = 0; bar < barCount; bar++) { + const barHeight = audioFrequencyData[bar]; + fftCanvasContext.fillStyle = `rgb(${barHeight / 256 * 200 + 50}, 50, ${50 + 2 * bar})`; + fftCanvasContext.fillRect(bar * (barWidth + 1), FFT_HEIGHT - barHeight, barWidth, barHeight); + } + } + + // Bandwidth + bandwidthCanvasContext.fillStyle = "rgb(255, 255, 255)"; + bandwidthCanvasContext.fillRect(0, 0, BANDWIDTH_WIDTH, BANDWIDTH_HEIGHT); + bandwidthCanvasContext.fillStyle = `rgb(100, 100, 100)`; + for (let t = 0; t < bandwidthBins.length; t++) { + const lineHeight = (bandwidthBins[t] / 1000) * BANDWIDTH_HEIGHT; + bandwidthCanvasContext.fillRect(t * 2, BANDWIDTH_HEIGHT - lineHeight, 2, lineHeight); + } + + // Display again at the next frame + requestAnimationFrame(onAnimationFrame); +} + +function onMediaSourceOpen() { + console.log(this.readyState); + sourceBuffer = mediaSource.addSourceBuffer("audio/aac"); +} + +function onMediaSourceClose() { + console.log(this.readyState); +} + +function onMediaSourceEnd() { + console.log(this.readyState); +} + +async function startAudio() { + try { + console.log("starting audio..."); + audioOnButton.disabled = true; + audioState = "starting"; + await audioElement.play(); + console.log("audio started"); + audioState = "playing"; + startAnalyzer(); + } catch(error) { + console.error(`play failed: ${error}`); + audioState = "stopped"; + audioOnButton.disabled = false; + } +} + +function onAudioPacket(packet) { + if (audioState != "stopped") { + // Queue the audio packet. + sourceBuffer.appendBuffer(packet); + } + + packetsReceived += 1; + packetsReceivedText.innerText = packetsReceived; + bytesReceived += packet.byteLength; + bytesReceivedText.innerText = bytesReceived; + + bandwidthBins[bandwidthBins.length] = packet.byteLength; + if (bandwidthBins.length > bandwidthBinCount) { + bandwidthBins.shift(); + } +} + +function onChannelOpen() { + console.log('channel OPEN'); + setConnectionText(""); + controlsDiv.style.visibility = "visible"; + + // Handshake with the backend. + sendMessage({ + type: "hello" + }); +} + +function onChannelClose() { + console.log('channel CLOSED'); + setConnectionText("Connection to CLI app closed, restart it and reload this page."); + controlsDiv.style.visibility = "hidden"; +} + +function onChannelError(error) { + console.log(`channel ERROR: ${error}`); + setConnectionText(`Connection to CLI app error ({${error}}), restart it and reload this page.`); + controlsDiv.style.visibility = "hidden"; +} + +function onChannelMessage(message) { + if (typeof message.data === 'string' || message.data instanceof String) { + // JSON message. + const jsonMessage = JSON.parse(message.data); + console.log(`channel MESSAGE: ${message.data}`); + + // Dispatch the message. + const handlerName = `on${jsonMessage.type.charAt(0).toUpperCase()}${jsonMessage.type.slice(1)}Message` + const handler = messageHandlers[handlerName]; + if (handler !== undefined) { + const params = jsonMessage.params; + if (params === undefined) { + params = {}; + } + handler(params); + } else { + console.warn(`unhandled message: ${jsonMessage.type}`) + } + } else { + // BINARY audio data. + onAudioPacket(message.data); + } +} + +function onHelloMessage(params) { + codecText.innerText = params.codec; + if (params.codec != "aac") { + audioOnButton.disabled = true; + audioSupportMessageText.innerText = "Only AAC can be played, audio will be disabled"; + audioSupportMessageText.style.display = "inline-block"; + } else { + audioSupportMessageText.innerText = ""; + audioSupportMessageText.style.display = "none"; + } + if (params.streamState) { + setStreamState(params.streamState); + } +} + +function onStartMessage(params) { + setStreamState("STARTED"); +} + +function onStopMessage(params) { + setStreamState("STOPPED"); +} + +function onSuspendMessage(params) { + setStreamState("SUSPENDED"); +} + +function onConnectionMessage(params) { + connectionStateText.innerText = `CONNECTED: ${params.peer_name} (${params.peer_address})`; +} + +function onDisconnectionMessage(params) { + connectionStateText.innerText = "DISCONNECTED"; +} + +function sendMessage(message) { + channelSocket.send(JSON.stringify(message)); +} + +function connect() { + console.log("connecting to CLI app"); + + channelSocket = new WebSocket(channelUrl); + channelSocket.binaryType = "arraybuffer"; + channelSocket.onopen = onChannelOpen; + channelSocket.onclose = onChannelClose; + channelSocket.onerror = onChannelError; + channelSocket.onmessage = onChannelMessage; +} + +const messageHandlers = { + onHelloMessage, + onStartMessage, + onStopMessage, + onSuspendMessage, + onConnectionMessage, + onDisconnectionMessage +} + +window.onload = (event) => { + init(); +} + +}()); \ No newline at end of file diff --git a/apps/speaker/speaker.py b/apps/speaker/speaker.py new file mode 100644 index 00000000..a2907d49 --- /dev/null +++ b/apps/speaker/speaker.py @@ -0,0 +1,747 @@ +# Copyright 2021-2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +from __future__ import annotations +import asyncio +import asyncio.subprocess +from importlib import resources +import enum +import json +import os +import logging +import pathlib +import subprocess +from typing import Dict, List, Optional +import weakref + +import click +import aiohttp +from aiohttp import web + +import bumble +from bumble.colors import color +from bumble.core import BT_BR_EDR_TRANSPORT, CommandTimeoutError +from bumble.device import Connection, Device, DeviceConfiguration +from bumble.hci import HCI_StatusError +from bumble.pairing import PairingConfig +from bumble.sdp import ServiceAttribute +from bumble.transport import open_transport +from bumble.avdtp import ( + AVDTP_AUDIO_MEDIA_TYPE, + Listener, + MediaCodecCapabilities, + MediaPacket, + Protocol, +) +from bumble.a2dp import ( + MPEG_2_AAC_LC_OBJECT_TYPE, + make_audio_sink_service_sdp_records, + A2DP_SBC_CODEC_TYPE, + A2DP_MPEG_2_4_AAC_CODEC_TYPE, + SBC_MONO_CHANNEL_MODE, + SBC_DUAL_CHANNEL_MODE, + SBC_SNR_ALLOCATION_METHOD, + SBC_LOUDNESS_ALLOCATION_METHOD, + SBC_STEREO_CHANNEL_MODE, + SBC_JOINT_STEREO_CHANNEL_MODE, + SbcMediaCodecInformation, + AacMediaCodecInformation, +) +from bumble.utils import AsyncRunner +from bumble.codecs import AacAudioRtpPacket + + +# ----------------------------------------------------------------------------- +# Logging +# ----------------------------------------------------------------------------- +logger = logging.getLogger(__name__) + + +# ----------------------------------------------------------------------------- +# Constants +# ----------------------------------------------------------------------------- +DEFAULT_UI_PORT = 7654 + +# ----------------------------------------------------------------------------- +class AudioExtractor: + @staticmethod + def create(codec: str): + if codec == 'aac': + return AacAudioExtractor() + if codec == 'sbc': + return SbcAudioExtractor() + + def extract_audio(self, packet: MediaPacket) -> bytes: + raise NotImplementedError() + + +# ----------------------------------------------------------------------------- +class AacAudioExtractor: + def extract_audio(self, packet: MediaPacket) -> bytes: + return AacAudioRtpPacket(packet.payload).to_adts() + + +# ----------------------------------------------------------------------------- +class SbcAudioExtractor: + def extract_audio(self, packet: MediaPacket) -> bytes: + # header = packet.payload[0] + # fragmented = header >> 7 + # start = (header >> 6) & 0x01 + # last = (header >> 5) & 0x01 + # number_of_frames = header & 0x0F + + # TODO: support fragmented payloads + return packet.payload[1:] + + +# ----------------------------------------------------------------------------- +class Output: + async def start(self) -> None: + pass + + async def stop(self) -> None: + pass + + async def suspend(self) -> None: + pass + + async def on_connection(self, connection: Connection) -> None: + pass + + async def on_disconnection(self, reason: int) -> None: + pass + + def on_rtp_packet(self, packet: MediaPacket) -> None: + pass + + +# ----------------------------------------------------------------------------- +class FileOutput(Output): + filename: str + codec: str + extractor: AudioExtractor + + def __init__(self, filename, codec): + self.filename = filename + self.codec = codec + self.file = open(filename, 'wb') + self.extractor = AudioExtractor.create(codec) + + def on_rtp_packet(self, packet: MediaPacket) -> None: + self.file.write(self.extractor.extract_audio(packet)) + + +# ----------------------------------------------------------------------------- +class QueuedOutput(Output): + MAX_QUEUE_SIZE = 32768 + + packets: asyncio.Queue + extractor: AudioExtractor + packet_pump_task: Optional[asyncio.Task] + started: bool + + def __init__(self, extractor): + self.extractor = extractor + self.packets = asyncio.Queue() + self.packet_pump_task = None + self.started = False + + async def start(self): + if self.started: + return + + self.packet_pump_task = asyncio.create_task(self.pump_packets()) + + async def pump_packets(self): + while True: + packet = await self.packets.get() + await self.on_audio_packet(packet) + + async def on_audio_packet(self, packet: bytes) -> None: + pass + + def on_rtp_packet(self, packet: MediaPacket) -> None: + if self.packets.qsize() > self.MAX_QUEUE_SIZE: + logger.debug("queue full, dropping") + return + + self.packets.put_nowait(self.extractor.extract_audio(packet)) + + +# ----------------------------------------------------------------------------- +class WebSocketOutput(QueuedOutput): + def __init__(self, codec, send_audio, send_message): + super().__init__(AudioExtractor.create(codec)) + self.send_audio = send_audio + self.send_message = send_message + + async def on_connection(self, connection: Connection) -> None: + try: + await connection.request_remote_name() + except HCI_StatusError: + pass + peer_name = '' if connection.peer_name is None else connection.peer_name + peer_address = str(connection.peer_address).replace('/P', '') + await self.send_message( + 'connection', + peer_address=peer_address, + peer_name=peer_name, + ) + + async def on_disconnection(self, reason) -> None: + await self.send_message('disconnection') + + async def on_audio_packet(self, packet: bytes) -> None: + await self.send_audio(packet) + + async def start(self): + await super().start() + await self.send_message('start') + + async def stop(self): + await super().stop() + await self.send_message('stop') + + async def suspend(self): + await super().suspend() + await self.send_message('suspend') + + +# ----------------------------------------------------------------------------- +class FfplayOutput(QueuedOutput): + MAX_QUEUE_SIZE = 32768 + + subprocess: Optional[asyncio.subprocess.Process] + ffplay_task: Optional[asyncio.Task] + + def __init__(self) -> None: + super().__init__(AacAudioExtractor()) + self.subprocess = None + self.ffplay_task = None + + async def start(self): + if self.started: + return + + await super().start() + + self.subprocess = await asyncio.create_subprocess_shell( + 'ffplay -acodec aac pipe:0', + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + self.ffplay_task = asyncio.create_task(self.monitor_ffplay()) + + async def stop(self): + # TODO + pass + + async def suspend(self): + # TODO + pass + + async def monitor_ffplay(self): + async def read_stream(name, stream): + while True: + data = await stream.read() + logger.debug(f'{name}:', data) + + await asyncio.wait( + [ + asyncio.create_task( + read_stream('[ffplay stdout]', self.subprocess.stdout) + ), + asyncio.create_task( + read_stream('[ffplay stderr]', self.subprocess.stderr) + ), + asyncio.create_task(self.subprocess.wait()), + ] + ) + logger.debug("FFPLAY done") + + async def on_audio_packet(self, packet): + try: + self.subprocess.stdin.write(packet) + except Exception: + logger.warning('!!!! exception while sending audio to ffplay pipe') + + +# ----------------------------------------------------------------------------- +class UiServer: + speaker: weakref.ReferenceType[Speaker] + port: int + + def __init__(self, speaker: Speaker, port: int) -> None: + self.speaker = weakref.ref(speaker) + self.port = port + self.channel_socket = None + + async def start_http(self) -> None: + """Start the UI HTTP server.""" + + app = web.Application() + app.add_routes( + [ + web.get('/', self.get_static), + web.get('/speaker.html', self.get_static), + web.get('/speaker.js', self.get_static), + web.get('/speaker.css', self.get_static), + web.get('/logo.svg', self.get_static), + web.get('/channel', self.get_channel), + ] + ) + + runner = web.AppRunner(app) + await runner.setup() + site = web.TCPSite(runner, 'localhost', self.port) + print('UI HTTP server at ' + color(f'http://127.0.0.1:{self.port}', 'green')) + await site.start() + + async def get_static(self, request): + path = request.path + if path == '/': + path = '/speaker.html' + if path.endswith('.html'): + content_type = 'text/html' + elif path.endswith('.js'): + content_type = 'text/javascript' + elif path.endswith('.css'): + content_type = 'text/css' + elif path.endswith('.svg'): + content_type = 'image/svg+xml' + else: + content_type = 'text/plain' + text = ( + resources.files("bumble.apps.speaker") + .joinpath(pathlib.Path(path).relative_to('/')) + .read_text(encoding="utf-8") + ) + return aiohttp.web.Response(text=text, content_type=content_type) + + async def get_channel(self, request): + ws = web.WebSocketResponse() + await ws.prepare(request) + + # Process messages until the socket is closed. + self.channel_socket = ws + async for message in ws: + if message.type == aiohttp.WSMsgType.TEXT: + logger.debug(f'<<< received message: {message.data}') + await self.on_message(message.data) + elif message.type == aiohttp.WSMsgType.ERROR: + logger.debug( + f'channel connection closed with exception {ws.exception()}' + ) + + self.channel_socket = None + logger.debug('--- channel connection closed') + + return ws + + async def on_message(self, message_str: str): + # Parse the message as JSON + message = json.loads(message_str) + + # Dispatch the message + message_type = message['type'] + message_params = message.get('params', {}) + handler = getattr(self, f'on_{message_type}_message') + if handler: + await handler(**message_params) + + async def on_hello_message(self): + await self.send_message( + 'hello', + bumble_version=bumble.__version__, + codec=self.speaker().codec, + streamState=self.speaker().stream_state.name, + ) + if connection := self.speaker().connection: + await self.send_message( + 'connection', + peer_address=str(connection.peer_address).replace('/P', ''), + peer_name=connection.peer_name, + ) + + async def send_message(self, message_type: str, **kwargs) -> None: + if self.channel_socket is None: + return + + message = {'type': message_type, 'params': kwargs} + await self.channel_socket.send_json(message) + + async def send_audio(self, data: bytes) -> None: + if self.channel_socket is None: + return + + try: + await self.channel_socket.send_bytes(data) + except Exception as error: + logger.warning(f'exception while sending audio packet: {error}') + + +# ----------------------------------------------------------------------------- +class Speaker: + class StreamState(enum.Enum): + IDLE = 0 + STOPPED = 1 + STARTED = 2 + SUSPENDED = 3 + + def __init__(self, device_config, transport, codec, discover, outputs, ui_port): + self.device_config = device_config + self.transport = transport + self.codec = codec + self.discover = discover + self.ui_port = ui_port + self.device = None + self.connection = None + self.listener = None + self.packets_received = 0 + self.bytes_received = 0 + self.stream_state = Speaker.StreamState.IDLE + self.outputs = [] + for output in outputs: + if output == '@ffplay': + self.outputs.append(FfplayOutput()) + continue + + # Default to FileOutput + self.outputs.append(FileOutput(output, codec)) + + # Create an HTTP server for the UI + self.ui_server = UiServer(speaker=self, port=ui_port) + + def sdp_records(self) -> Dict[int, List[ServiceAttribute]]: + service_record_handle = 0x00010001 + return { + service_record_handle: make_audio_sink_service_sdp_records( + service_record_handle + ) + } + + def codec_capabilities(self) -> MediaCodecCapabilities: + if self.codec == 'aac': + return self.aac_codec_capabilities() + + if self.codec == 'sbc': + return self.sbc_codec_capabilities() + + raise RuntimeError('unsupported codec') + + def aac_codec_capabilities(self) -> MediaCodecCapabilities: + return MediaCodecCapabilities( + media_type=AVDTP_AUDIO_MEDIA_TYPE, + media_codec_type=A2DP_MPEG_2_4_AAC_CODEC_TYPE, + media_codec_information=AacMediaCodecInformation.from_lists( + object_types=[MPEG_2_AAC_LC_OBJECT_TYPE], + sampling_frequencies=[48000, 44100], + channels=[1, 2], + vbr=1, + bitrate=256000, + ), + ) + + def sbc_codec_capabilities(self) -> MediaCodecCapabilities: + return MediaCodecCapabilities( + media_type=AVDTP_AUDIO_MEDIA_TYPE, + media_codec_type=A2DP_SBC_CODEC_TYPE, + media_codec_information=SbcMediaCodecInformation.from_lists( + sampling_frequencies=[48000, 44100, 32000, 16000], + channel_modes=[ + SBC_MONO_CHANNEL_MODE, + SBC_DUAL_CHANNEL_MODE, + SBC_STEREO_CHANNEL_MODE, + SBC_JOINT_STEREO_CHANNEL_MODE, + ], + block_lengths=[4, 8, 12, 16], + subbands=[4, 8], + allocation_methods=[ + SBC_LOUDNESS_ALLOCATION_METHOD, + SBC_SNR_ALLOCATION_METHOD, + ], + minimum_bitpool_value=2, + maximum_bitpool_value=53, + ), + ) + + async def dispatch_to_outputs(self, function): + for output in self.outputs: + await function(output) + + def on_bluetooth_connection(self, connection): + print(f'Connection: {connection}') + self.connection = connection + connection.on('disconnection', self.on_bluetooth_disconnection) + AsyncRunner.spawn( + self.dispatch_to_outputs(lambda output: output.on_connection(connection)) + ) + + def on_bluetooth_disconnection(self, reason): + print(f'Disconnection ({reason})') + self.connection = None + AsyncRunner.spawn(self.advertise()) + AsyncRunner.spawn( + self.dispatch_to_outputs(lambda output: output.on_disconnection(reason)) + ) + + def on_avdtp_connection(self, protocol): + print('Audio Stream Open') + + # Add a sink endpoint to the server + sink = protocol.add_sink(self.codec_capabilities()) + sink.on('start', self.on_sink_start) + sink.on('stop', self.on_sink_stop) + sink.on('suspend', self.on_sink_suspend) + sink.on('configuration', lambda: self.on_sink_configuration(sink.configuration)) + sink.on('rtp_packet', self.on_rtp_packet) + sink.on('rtp_channel_open', self.on_rtp_channel_open) + sink.on('rtp_channel_close', self.on_rtp_channel_close) + + # Listen for close events + protocol.on('close', self.on_avdtp_close) + + # Discover all endpoints on the remote device is requested + if self.discover: + AsyncRunner.spawn(self.discover_remote_endpoints(protocol)) + + def on_avdtp_close(self): + print("Audio Stream Closed") + + def on_sink_start(self): + print("Sink Started\u001b[0K") + self.stream_state = self.StreamState.STARTED + AsyncRunner.spawn(self.dispatch_to_outputs(lambda output: output.start())) + + def on_sink_stop(self): + print("Sink Stopped\u001b[0K") + self.stream_state = self.StreamState.STOPPED + AsyncRunner.spawn(self.dispatch_to_outputs(lambda output: output.stop())) + + def on_sink_suspend(self): + print("Sink Suspended\u001b[0K") + self.stream_state = self.StreamState.SUSPENDED + AsyncRunner.spawn(self.dispatch_to_outputs(lambda output: output.suspend())) + + def on_sink_configuration(self, config): + print("Sink Configuration:") + print('\n'.join([" " + str(capability) for capability in config])) + + def on_rtp_channel_open(self): + print("RTP Channel Open") + + def on_rtp_channel_close(self): + print("RTP Channel Closed") + self.stream_state = self.StreamState.IDLE + + def on_rtp_packet(self, packet): + self.packets_received += 1 + self.bytes_received += len(packet.payload) + print( + f'[{self.bytes_received} bytes in {self.packets_received} packets] {packet}', + end='\r', + ) + + for output in self.outputs: + output.on_rtp_packet(packet) + + async def advertise(self): + await self.device.set_discoverable(True) + await self.device.set_connectable(True) + + async def connect(self, address): + # Connect to the source + print(f'=== Connecting to {address}...') + connection = await self.device.connect(address, transport=BT_BR_EDR_TRANSPORT) + print(f'=== Connected to {connection.peer_address}') + + # Request authentication + print('*** Authenticating...') + await connection.authenticate() + print('*** Authenticated') + + # Enable encryption + print('*** Enabling encryption...') + await connection.encrypt() + print('*** Encryption on') + + protocol = await Protocol.connect(connection) + self.listener.set_server(connection, protocol) + self.on_avdtp_connection(protocol) + + async def discover_remote_endpoints(self, protocol): + endpoints = await protocol.discover_remote_endpoints() + print(f'@@@ Found {len(endpoints)} endpoints') + for endpoint in endpoints: + print('@@@', endpoint) + + async def run(self, connect_address): + await self.ui_server.start_http() + self.outputs.append( + WebSocketOutput( + self.codec, self.ui_server.send_audio, self.ui_server.send_message + ) + ) + + async with await open_transport(self.transport) as (hci_source, hci_sink): + # Create a device + device_config = DeviceConfiguration() + if self.device_config: + device_config.load_from_file(self.device_config) + else: + device_config.name = "Bumble Speaker" + device_config.class_of_device = 0x240414 + device_config.keystore = "JsonKeyStore" + + device_config.classic_enabled = True + device_config.le_enabled = False + self.device = Device.from_config_with_hci( + device_config, hci_source, hci_sink + ) + + # Setup the SDP to expose the sink service + self.device.sdp_service_records = self.sdp_records() + + # Don't require MITM when pairing. + self.device.pairing_config_factory = lambda connection: PairingConfig( + mitm=False + ) + + # Start the controller + await self.device.power_on() + + # Print some of the config/properties + print("Speaker Name:", color(device_config.name, 'yellow')) + print( + "Speaker Bluetooth Address:", + color( + self.device.public_address.to_string(with_type_qualifier=False), + 'yellow', + ), + ) + + # Listen for Bluetooth connections + self.device.on('connection', self.on_bluetooth_connection) + + # Create a listener to wait for AVDTP connections + self.listener = Listener(Listener.create_registrar(self.device)) + self.listener.on('connection', self.on_avdtp_connection) + + print(f'Speaker ready to play, codec={color(self.codec, "cyan")}') + + if connect_address: + # Connect to the source + try: + await self.connect(connect_address) + except CommandTimeoutError: + print(color("Connection timed out", "red")) + return + else: + # Start being discoverable and connectable + print("Waiting for connection...") + await self.advertise() + + await hci_source.wait_for_termination() + + for output in self.outputs: + await output.stop() + + +# ----------------------------------------------------------------------------- +@click.group() +@click.pass_context +def speaker_cli(ctx, device_config): + ctx.ensure_object(dict) + ctx.obj['device_config'] = device_config + + +@click.command() +@click.option( + '--codec', type=click.Choice(['sbc', 'aac']), default='aac', show_default=True +) +@click.option( + '--discover', is_flag=True, help='Discover remote endpoints once connected' +) +@click.option( + '--output', + multiple=True, + metavar='NAME', + help=( + 'Send audio to this named output ' + '(may be used more than once for multiple outputs)' + ), +) +@click.option( + '--ui-port', + 'ui_port', + metavar='HTTP_PORT', + default=DEFAULT_UI_PORT, + show_default=True, + help='HTTP port for the UI server', +) +@click.option( + '--connect', + 'connect_address', + metavar='ADDRESS_OR_NAME', + help='Address or name to connect to', +) +@click.option('--device-config', metavar='FILENAME', help='Device configuration file') +@click.argument('transport') +def speaker( + transport, codec, connect_address, discover, output, ui_port, device_config +): + """Run the speaker.""" + + # ffplay only works with AAC for now + if codec != 'aac' and '@ffplay' in output: + print( + color( + f'{codec} not supported with @ffplay output, ' + '@ffplay output will be skipped', + 'yellow', + ) + ) + output = list(filter(lambda x: x != '@ffplay', output)) + + if '@ffplay' in output: + # Check if ffplay is installed + try: + subprocess.run(['ffplay', '-version'], capture_output=True, check=True) + except FileNotFoundError: + print( + color('ffplay not installed, @ffplay output will be disabled', 'yellow') + ) + output = list(filter(lambda x: x != '@ffplay', output)) + + asyncio.run( + Speaker(device_config, transport, codec, discover, output, ui_port).run( + connect_address + ) + ) + + +# ----------------------------------------------------------------------------- +def main(): + logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper()) + speaker() + + +# ----------------------------------------------------------------------------- +if __name__ == "__main__": + main() # pylint: disable=no-value-for-parameter diff --git a/bumble/a2dp.py b/bumble/a2dp.py index 772846a3..eeecb1ee 100644 --- a/bumble/a2dp.py +++ b/bumble/a2dp.py @@ -432,6 +432,7 @@ class AacMediaCodecInformation( cls.SAMPLING_FREQUENCY_BITS[x] for x in sampling_frequencies ), channels=sum(cls.CHANNELS_BITS[x] for x in channels), + rfa=0, vbr=vbr, bitrate=bitrate, ) diff --git a/bumble/avdtp.py b/bumble/avdtp.py index 238036dc..3988f309 100644 --- a/bumble/avdtp.py +++ b/bumble/avdtp.py @@ -1207,7 +1207,7 @@ class DelayReport_Reject(Simple_Reject): # ----------------------------------------------------------------------------- -class Protocol: +class Protocol(EventEmitter): SINGLE_PACKET = 0 START_PACKET = 1 CONTINUE_PACKET = 2 @@ -1234,6 +1234,7 @@ class Protocol: return protocol def __init__(self, l2cap_channel, version=(1, 3)): + super().__init__() self.l2cap_channel = l2cap_channel self.version = version self.rtx_sig_timer = AVDTP_DEFAULT_RTX_SIG_TIMER @@ -1250,6 +1251,7 @@ class Protocol: # Register to receive PDUs from the channel l2cap_channel.sink = self.on_pdu l2cap_channel.on('open', self.on_l2cap_channel_open) + l2cap_channel.on('close', self.on_l2cap_channel_close) def get_local_endpoint_by_seid(self, seid): if 0 < seid <= len(self.local_endpoints): @@ -1392,11 +1394,18 @@ class Protocol: def on_l2cap_connection(self, channel): # Forward the channel to the endpoint that's expecting it - if self.channel_acceptor: - self.channel_acceptor.on_l2cap_connection(channel) + if self.channel_acceptor is None: + logger.warning(color('!!! l2cap connection with no acceptor', 'red')) + return + self.channel_acceptor.on_l2cap_connection(channel) def on_l2cap_channel_open(self): logger.debug(color('<<< L2CAP channel open', 'magenta')) + self.emit('open') + + def on_l2cap_channel_close(self): + logger.debug(color('<<< L2CAP channel close', 'magenta')) + self.emit('close') def send_message(self, transaction_label, message): logger.debug( @@ -1651,6 +1660,10 @@ class Listener(EventEmitter): def set_server(self, connection, server): self.servers[connection.handle] = server + def remove_server(self, connection): + if connection.handle in self.servers: + del self.servers[connection.handle] + def __init__(self, registrar, version=(1, 3)): super().__init__() self.version = version @@ -1669,11 +1682,17 @@ class Listener(EventEmitter): else: # This is a new command/response channel def on_channel_open(): + logger.debug('setting up new Protocol for the connection') server = Protocol(channel, self.version) self.set_server(channel.connection, server) self.emit('connection', server) + def on_channel_close(): + logger.debug('removing Protocol for the connection') + self.remove_server(channel.connection) + channel.on('open', on_channel_open) + channel.on('close', on_channel_close) # ----------------------------------------------------------------------------- @@ -1967,11 +1986,12 @@ class DiscoveredStreamEndPoint(StreamEndPoint, StreamEndPointProxy): # ----------------------------------------------------------------------------- -class LocalStreamEndPoint(StreamEndPoint): +class LocalStreamEndPoint(StreamEndPoint, EventEmitter): def __init__( self, protocol, seid, media_type, tsep, capabilities, configuration=None ): - super().__init__(seid, media_type, tsep, 0, capabilities) + StreamEndPoint.__init__(self, seid, media_type, tsep, 0, capabilities) + EventEmitter.__init__(self) self.protocol = protocol self.configuration = configuration if configuration is not None else [] self.stream = None @@ -1988,40 +2008,47 @@ class LocalStreamEndPoint(StreamEndPoint): def on_reconfigure_command(self, command): pass + def on_set_configuration_command(self, configuration): + logger.debug( + '<<< received configuration: ' + f'{",".join([str(capability) for capability in configuration])}' + ) + self.configuration = configuration + self.emit('configuration') + def on_get_configuration_command(self): return Get_Configuration_Response(self.configuration) def on_open_command(self): - pass + self.emit('open') def on_start_command(self): - pass + self.emit('start') def on_suspend_command(self): - pass + self.emit('suspend') def on_close_command(self): - pass + self.emit('close') def on_abort_command(self): - pass + self.emit('abort') def on_rtp_channel_open(self): - pass + self.emit('rtp_channel_open') def on_rtp_channel_close(self): - pass + self.emit('rtp_channel_close') # ----------------------------------------------------------------------------- -class LocalSource(LocalStreamEndPoint, EventEmitter): +class LocalSource(LocalStreamEndPoint): def __init__(self, protocol, seid, codec_capabilities, packet_pump): capabilities = [ ServiceCapabilities(AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY), codec_capabilities, ] - LocalStreamEndPoint.__init__( - self, + super().__init__( protocol, seid, codec_capabilities.media_type, @@ -2029,14 +2056,13 @@ class LocalSource(LocalStreamEndPoint, EventEmitter): capabilities, capabilities, ) - EventEmitter.__init__(self) self.packet_pump = packet_pump async def start(self): if self.packet_pump: return await self.packet_pump.start(self.stream.rtp_channel) - self.emit('start', self.stream.rtp_channel) + self.emit('start') async def stop(self): if self.packet_pump: @@ -2044,11 +2070,6 @@ class LocalSource(LocalStreamEndPoint, EventEmitter): self.emit('stop') - def on_set_configuration_command(self, configuration): - # For now, blindly accept the configuration - logger.debug(f'<<< received source configuration: {configuration}') - self.configuration = configuration - def on_start_command(self): asyncio.create_task(self.start()) @@ -2057,30 +2078,28 @@ class LocalSource(LocalStreamEndPoint, EventEmitter): # ----------------------------------------------------------------------------- -class LocalSink(LocalStreamEndPoint, EventEmitter): +class LocalSink(LocalStreamEndPoint): def __init__(self, protocol, seid, codec_capabilities): capabilities = [ ServiceCapabilities(AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY), codec_capabilities, ] - LocalStreamEndPoint.__init__( - self, + super().__init__( protocol, seid, codec_capabilities.media_type, AVDTP_TSEP_SNK, capabilities, ) - EventEmitter.__init__(self) - - def on_set_configuration_command(self, configuration): - # For now, blindly accept the configuration - logger.debug(f'<<< received sink configuration: {configuration}') - self.configuration = configuration def on_rtp_channel_open(self): logger.debug(color('<<< RTP channel open', 'magenta')) self.stream.rtp_channel.sink = self.on_avdtp_packet + super().on_rtp_channel_open() + + def on_rtp_channel_close(self): + logger.debug(color('<<< RTP channel close', 'magenta')) + super().on_rtp_channel_close() def on_avdtp_packet(self, packet): rtp_packet = MediaPacket.from_bytes(packet) diff --git a/bumble/codecs.py b/bumble/codecs.py new file mode 100644 index 00000000..1d7ae82c --- /dev/null +++ b/bumble/codecs.py @@ -0,0 +1,381 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +from __future__ import annotations +from dataclasses import dataclass + + +# ----------------------------------------------------------------------------- +class BitReader: + """Simple but not optimized bit stream reader.""" + + data: bytes + bytes_position: int + bit_position: int + cache: int + bits_cached: int + + def __init__(self, data: bytes): + self.data = data + self.byte_position = 0 + self.bit_position = 0 + self.cache = 0 + self.bits_cached = 0 + + def read(self, bits: int) -> int: + """ "Read up to 32 bits.""" + + if bits > 32: + raise ValueError('maximum read size is 32') + + if self.bits_cached >= bits: + # We have enough bits. + self.bits_cached -= bits + self.bit_position += bits + return (self.cache >> self.bits_cached) & ((1 << bits) - 1) + + # Read more cache, up to 32 bits + feed_bytes = self.data[self.byte_position : self.byte_position + 4] + feed_size = len(feed_bytes) + feed_int = int.from_bytes(feed_bytes, byteorder='big') + if 8 * feed_size + self.bits_cached < bits: + raise ValueError('trying to read past the data') + self.byte_position += feed_size + + # Combine the new cache and the old cache + cache = self.cache & ((1 << self.bits_cached) - 1) + new_bits = bits - self.bits_cached + self.bits_cached = 8 * feed_size - new_bits + result = (feed_int >> self.bits_cached) | (cache << new_bits) + self.cache = feed_int + + self.bit_position += bits + return result + + def read_bytes(self, count: int): + if self.bit_position + 8 * count > 8 * len(self.data): + raise ValueError('not enough data') + + if self.bit_position % 8: + # Not byte aligned + result = bytearray(count) + for i in range(count): + result[i] = self.read(8) + return bytes(result) + + # Byte aligned + self.byte_position = self.bit_position // 8 + self.bits_cached = 0 + self.cache = 0 + offset = self.bit_position // 8 + self.bit_position += 8 * count + return self.data[offset : offset + count] + + def bits_left(self) -> int: + return (8 * len(self.data)) - self.bit_position + + def skip(self, bits: int) -> None: + # Slow, but simple... + while bits: + if bits > 32: + self.read(32) + bits -= 32 + else: + self.read(bits) + break + + +# ----------------------------------------------------------------------------- +class AacAudioRtpPacket: + """AAC payload encapsulated in an RTP packet payload""" + + @staticmethod + def latm_value(reader: BitReader) -> int: + bytes_for_value = reader.read(2) + value = 0 + for _ in range(bytes_for_value + 1): + value = value * 256 + reader.read(8) + return value + + @staticmethod + def program_config_element(reader: BitReader): + raise ValueError('program_config_element not supported') + + @dataclass + class GASpecificConfig: + def __init__( + self, reader: BitReader, channel_configuration: int, audio_object_type: int + ) -> None: + # GASpecificConfig - ISO/EIC 14496-3 Table 4.1 + frame_length_flag = reader.read(1) + depends_on_core_coder = reader.read(1) + if depends_on_core_coder: + self.core_coder_delay = reader.read(14) + extension_flag = reader.read(1) + if not channel_configuration: + AacAudioRtpPacket.program_config_element(reader) + if audio_object_type in (6, 20): + self.layer_nr = reader.read(3) + if extension_flag: + if audio_object_type == 22: + num_of_sub_frame = reader.read(5) + layer_length = reader.read(11) + if audio_object_type in (17, 19, 20, 23): + aac_section_data_resilience_flags = reader.read(1) + aac_scale_factor_data_resilience_flags = reader.read(1) + aac_spectral_data_resilience_flags = reader.read(1) + extension_flag_3 = reader.read(1) + if extension_flag_3 == 1: + raise ValueError('extensionFlag3 == 1 not supported') + + @staticmethod + def audio_object_type(reader: BitReader): + # GetAudioObjectType - ISO/EIC 14496-3 Table 1.16 + audio_object_type = reader.read(5) + if audio_object_type == 31: + audio_object_type = 32 + reader.read(6) + + return audio_object_type + + @dataclass + class AudioSpecificConfig: + audio_object_type: int + sampling_frequency_index: int + sampling_frequency: int + channel_configuration: int + sbr_present_flag: int + ps_present_flag: int + extension_audio_object_type: int + extension_sampling_frequency_index: int + extension_sampling_frequency: int + extension_channel_configuration: int + + SAMPLING_FREQUENCIES = [ + 96000, + 88200, + 64000, + 48000, + 44100, + 32000, + 24000, + 22050, + 16000, + 12000, + 11025, + 8000, + 7350, + ] + + def __init__(self, reader: BitReader) -> None: + # AudioSpecificConfig - ISO/EIC 14496-3 Table 1.15 + self.audio_object_type = AacAudioRtpPacket.audio_object_type(reader) + self.sampling_frequency_index = reader.read(4) + if self.sampling_frequency_index == 0xF: + self.sampling_frequency = reader.read(24) + else: + self.sampling_frequency = self.SAMPLING_FREQUENCIES[ + self.sampling_frequency_index + ] + self.channel_configuration = reader.read(4) + self.sbr_present_flag = -1 + self.ps_present_flag = -1 + if self.audio_object_type in (5, 29): + self.extension_audio_object_type = 5 + self.sbc_present_flag = 1 + if self.audio_object_type == 29: + self.ps_present_flag = 1 + self.extension_sampling_frequency_index = reader.read(4) + if self.extension_sampling_frequency_index == 0xF: + self.extension_sampling_frequency = reader.read(24) + else: + self.extension_sampling_frequency = self.SAMPLING_FREQUENCIES[ + self.extension_sampling_frequency_index + ] + self.audio_object_type = AacAudioRtpPacket.audio_object_type(reader) + if self.audio_object_type == 22: + self.extension_channel_configuration = reader.read(4) + else: + self.extension_audio_object_type = 0 + + if self.audio_object_type in (1, 2, 3, 4, 6, 7, 17, 19, 20, 21, 22, 23): + ga_specific_config = AacAudioRtpPacket.GASpecificConfig( + reader, self.channel_configuration, self.audio_object_type + ) + else: + raise ValueError( + f'audioObjectType {self.audio_object_type} not supported' + ) + + # if self.extension_audio_object_type != 5 and bits_to_decode >= 16: + # sync_extension_type = reader.read(11) + # if sync_extension_type == 0x2B7: + # self.extension_audio_object_type = AacAudioRtpPacket.audio_object_type(reader) + # if self.extension_audio_object_type == 5: + # self.sbr_present_flag = reader.read(1) + # if self.sbr_present_flag: + # self.extension_sampling_frequency_index = reader.read(4) + # if self.extension_sampling_frequency_index == 0xF: + # self.extension_sampling_frequency = reader.read(24) + # else: + # self.extension_sampling_frequency = self.SAMPLING_FREQUENCIES[self.extension_sampling_frequency_index] + # if bits_to_decode >= 12: + # sync_extension_type = reader.read(11) + # if sync_extension_type == 0x548: + # self.ps_present_flag = reader.read(1) + # elif self.extension_audio_object_type == 22: + # self.sbr_present_flag = reader.read(1) + # if self.sbr_present_flag: + # self.extension_sampling_frequency_index = reader.read(4) + # if self.extension_sampling_frequency_index == 0xF: + # self.extension_sampling_frequency = reader.read(24) + # else: + # self.extension_sampling_frequency = self.SAMPLING_FREQUENCIES[self.extension_sampling_frequency_index] + # self.extension_channel_configuration = reader.read(4) + + @dataclass + class StreamMuxConfig: + other_data_present: int + other_data_len_bits: int + audio_specific_config: AacAudioRtpPacket.AudioSpecificConfig + + def __init__(self, reader: BitReader) -> None: + # StreamMuxConfig - ISO/EIC 14496-3 Table 1.42 + audio_mux_version = reader.read(1) + if audio_mux_version == 1: + audio_mux_version_a = reader.read(1) + else: + audio_mux_version_a = 0 + if audio_mux_version_a != 0: + raise ValueError('audioMuxVersionA != 0 not supported') + if audio_mux_version == 1: + tara_buffer_fullness = AacAudioRtpPacket.latm_value(reader) + stream_cnt = 0 + all_streams_same_time_framing = reader.read(1) + num_sub_frames = reader.read(6) + num_program = reader.read(4) + if num_program != 0: + raise ValueError('num_program != 0 not supported') + num_layer = reader.read(3) + if num_layer != 0: + raise ValueError('num_layer != 0 not supported') + if audio_mux_version == 0: + self.audio_specific_config = AacAudioRtpPacket.AudioSpecificConfig( + reader + ) + else: + asc_len = AacAudioRtpPacket.latm_value(reader) + marker = reader.bit_position + self.audio_specific_config = AacAudioRtpPacket.AudioSpecificConfig( + reader + ) + audio_specific_config_len = reader.bit_position - marker + if asc_len < audio_specific_config_len: + raise ValueError('audio_specific_config_len > asc_len') + asc_len -= audio_specific_config_len + reader.skip(asc_len) + frame_length_type = reader.read(3) + if frame_length_type == 0: + latm_buffer_fullness = reader.read(8) + elif frame_length_type == 1: + frame_length = reader.read(9) + else: + raise ValueError(f'frame_length_type {frame_length_type} not supported') + + self.other_data_present = reader.read(1) + if self.other_data_present: + if audio_mux_version == 1: + self.other_data_len_bits = AacAudioRtpPacket.latm_value(reader) + else: + self.other_data_len_bits = 0 + while True: + self.other_data_len_bits *= 256 + other_data_len_esc = reader.read(1) + self.other_data_len_bits += reader.read(8) + if other_data_len_esc == 0: + break + crc_check_present = reader.read(1) + if crc_check_present: + crc_checksum = reader.read(8) + + @dataclass + class AudioMuxElement: + payload: bytes + stream_mux_config: AacAudioRtpPacket.StreamMuxConfig + + def __init__(self, reader: BitReader, mux_config_present: int): + if mux_config_present == 0: + raise ValueError('muxConfigPresent == 0 not supported') + + # AudioMuxElement - ISO/EIC 14496-3 Table 1.41 + use_same_stream_mux = reader.read(1) + if use_same_stream_mux: + raise ValueError('useSameStreamMux == 1 not supported') + self.stream_mux_config = AacAudioRtpPacket.StreamMuxConfig(reader) + + # We only support: + # allStreamsSameTimeFraming == 1 + # audioMuxVersionA == 0, + # numProgram == 0 + # numSubFrames == 0 + # numLayer == 0 + + mux_slot_length_bytes = 0 + while True: + tmp = reader.read(8) + mux_slot_length_bytes += tmp + if tmp != 255: + break + + self.payload = reader.read_bytes(mux_slot_length_bytes) + + if self.stream_mux_config.other_data_present: + reader.skip(self.stream_mux_config.other_data_len_bits) + + # ByteAlign + while reader.bit_position % 8: + reader.read(1) + + def __init__(self, data: bytes) -> None: + # Parse the bit stream + reader = BitReader(data) + self.audio_mux_element = self.AudioMuxElement(reader, mux_config_present=1) + + def to_adts(self): + # pylint: disable=line-too-long + sampling_frequency_index = ( + self.audio_mux_element.stream_mux_config.audio_specific_config.sampling_frequency_index + ) + channel_configuration = ( + self.audio_mux_element.stream_mux_config.audio_specific_config.channel_configuration + ) + frame_size = len(self.audio_mux_element.payload) + return ( + bytes( + [ + 0xFF, + 0xF1, # 0xF9 (MPEG2) + 0x40 + | (sampling_frequency_index << 2) + | (channel_configuration >> 2), + ((channel_configuration & 0x3) << 6) | ((frame_size + 7) >> 11), + ((frame_size + 7) >> 3) & 0xFF, + (((frame_size + 7) << 5) & 0xFF) | 0x1F, + 0xFC, + ] + ) + + self.audio_mux_element.payload + ) diff --git a/bumble/device.py b/bumble/device.py index 62ec8c22..7a43061c 100644 --- a/bumble/device.py +++ b/bumble/device.py @@ -954,12 +954,16 @@ class Device(CompositeEventEmitter): config.load_from_file(filename) return cls(config=config) + @classmethod + def from_config_with_hci(cls, config, hci_source, hci_sink): + host = Host(controller_source=hci_source, controller_sink=hci_sink) + return cls(config=config, host=host) + @classmethod def from_config_file_with_hci(cls, filename, hci_source, hci_sink): config = DeviceConfiguration() config.load_from_file(filename) - host = Host(controller_source=hci_source, controller_sink=hci_sink) - return cls(config=config, host=host) + return cls.from_config_with_hci(config, hci_source, hci_sink) def __init__( self, @@ -2441,7 +2445,7 @@ class Device(CompositeEventEmitter): if result.status != HCI_COMMAND_STATUS_PENDING: logger.warning( - 'HCI_Set_Connection_Encryption_Command failed: ' + 'HCI_Remote_Name_Request_Command failed: ' f'{HCI_Constant.error_name(result.status)}' ) raise HCI_StatusError(result) diff --git a/bumble/hci.py b/bumble/hci.py index 9b5793d4..1916aa35 100644 --- a/bumble/hci.py +++ b/bumble/hci.py @@ -62,7 +62,7 @@ def map_null_terminated_utf8_string(utf8_bytes): try: terminator = utf8_bytes.find(0) if terminator < 0: - return utf8_bytes + terminator = len(utf8_bytes) return utf8_bytes[0:terminator].decode('utf8') except UnicodeDecodeError: return utf8_bytes @@ -1795,6 +1795,16 @@ class Address: def to_bytes(self): return self.address_bytes + def to_string(self, with_type_qualifier=True): + ''' + String representation of the address, MSB first, with an optional type + qualifier. + ''' + result = ':'.join([f'{x:02X}' for x in reversed(self.address_bytes)]) + if not with_type_qualifier or not self.is_public: + return result + return result + '/P' + def __bytes__(self): return self.to_bytes() @@ -1808,13 +1818,7 @@ class Address: ) def __str__(self): - ''' - String representation of the address, MSB first - ''' - result = ':'.join([f'{x:02X}' for x in reversed(self.address_bytes)]) - if not self.is_public: - return result - return result + '/P' + return self.to_string() # Predefined address values @@ -5373,7 +5377,7 @@ class HCI_AclDataPacket: def __str__(self): return ( f'{color("ACL", "blue")}: ' - f'handle=0x{self.connection_handle:04x}' + f'handle=0x{self.connection_handle:04x}, ' f'pb={self.pb_flag}, bc={self.bc_flag}, ' f'data_total_length={self.data_total_length}, ' f'data={self.data.hex()}' diff --git a/bumble/host.py b/bumble/host.py index afde2ee6..a33efc80 100644 --- a/bumble/host.py +++ b/bumble/host.py @@ -62,6 +62,7 @@ from .hci import ( HCI_Read_Local_Version_Information_Command, HCI_Reset_Command, HCI_Set_Event_Mask_Command, + map_null_terminated_utf8_string, ) from .core import ( BT_BR_EDR_TRANSPORT, @@ -887,7 +888,12 @@ class Host(AbortableEventEmitter): if event.status != HCI_SUCCESS: self.emit('remote_name_failure', event.bd_addr, event.status) else: - self.emit('remote_name', event.bd_addr, event.remote_name) + utf8_name = event.remote_name + terminator = utf8_name.find(0) + if terminator >= 0: + utf8_name = utf8_name[0:terminator] + + self.emit('remote_name', event.bd_addr, utf8_name) def on_hci_remote_host_supported_features_notification_event(self, event): self.emit( diff --git a/docs/mkdocs/mkdocs.yml b/docs/mkdocs/mkdocs.yml index 0ddc982b..fde6b40b 100644 --- a/docs/mkdocs/mkdocs.yml +++ b/docs/mkdocs/mkdocs.yml @@ -44,6 +44,7 @@ nav: - Overview: apps_and_tools/index.md - Console: apps_and_tools/console.md - Bench: apps_and_tools/bench.md + - Speaker: apps_and_tools/speaker.md - HCI Bridge: apps_and_tools/hci_bridge.md - Golden Gate Bridge: apps_and_tools/gg_bridge.md - Show: apps_and_tools/show.md diff --git a/docs/mkdocs/src/apps_and_tools/index.md b/docs/mkdocs/src/apps_and_tools/index.md index fe7af564..0c2b4d5e 100644 --- a/docs/mkdocs/src/apps_and_tools/index.md +++ b/docs/mkdocs/src/apps_and_tools/index.md @@ -11,4 +11,5 @@ These include: * [HCI Bridge](hci_bridge.md) - a HCI transport bridge to connect two HCI transports and filter/snoop the HCI packets * [Golden Gate Bridge](gg_bridge.md) - a bridge between GATT and UDP to use with the Golden Gate "stack tool" * [Show](show.md) - Parse a file with HCI packets and print the details of each packet in a human readable form + * [Speaker](speaker.md) - Virtual Bluetooth speaker, with a command line and browser-based UI. * [Link Relay](link_relay.md) - WebSocket relay for virtual RemoteLink instances to communicate with each other. diff --git a/docs/mkdocs/src/apps_and_tools/speaker.md b/docs/mkdocs/src/apps_and_tools/speaker.md new file mode 100644 index 00000000..5569b9d0 --- /dev/null +++ b/docs/mkdocs/src/apps_and_tools/speaker.md @@ -0,0 +1,86 @@ +SPEAKER APP +=========== + +![logo](../images/speaker_screenshot.png){ width=400 height=320 } + +The Speaker app is virtual Bluetooth speaker (A2DP sink). +The app runs as a command-line executable, but also offers an optional simple +web-browser-based user interface. + +# General Usage +You can invoke the app either as `bumble-speaker` when installed as command +from `pip`, or `python3 apps/speaker/speaker.py` when running from a source +distribution. + +``` +Usage: speaker.py [OPTIONS] TRANSPORT + + Run the speaker. + +Options: + --codec [sbc|aac] [default: aac] + --discover Discover remote endpoints once connected + --output NAME Send audio to this named output (may be used more + than once for multiple outputs) + --ui-port HTTP_PORT HTTP port for the UI server [default: 7654] + --connect ADDRESS_OR_NAME Address or name to connect to + --device-config FILENAME Device configuration file + --help Show this message and exit. +``` + +# Connection +By default, the virtual speaker will wait for another device (like a phone or +computer) to connect to it (and possibly pair). Alternatively, the speaker can +be told to initiate a connection to a remote device, using the `--connect` +option. + +# Outputs +The speaker can have one or more outputs. By default, the only output is a text +display on the console, as well as a browser-based user interface if connected. +In addition, a file output can be used, in which case the received audio data is +saved to a specified file. +Finally, if the host computer on which your are running the application has `ffplay` +as an available command line executable, the `@ffplay` output can be selected, in +which case the received audio will be played on the computer's builtin speakers via +a pipe to `ffplay`. (see the [ffplay documentation](https://www.ffmpeg.org/ffplay.html) +for details) + +# Web User Interface +When the speaker app starts, it prints out on the console the local URL at which you +may point a browser (Chrome recommended for full functionality). The console line +specifying the local UI URL will look like: +``` +UI HTTP server at http://127.0.0.1:7654 +``` + +By default, the web UI will show the status of the connection, as well as a realtime +graph of the received audio bandwidth. +In order to also hear the received audio, you need to click the `Audio on` button +(this is due to the fact that most browsers will require some user interface with the +page before granting access to the audio output APIs). + +# Examples + +In the following examples, we use a single USB Bluetooth controllers `usb:0`. Other +transports can be used of course. + +!!! example "Start the speaker and wait for a connection" + ``` + $ bumble-speaker usb:0 + ``` + +!!! example "Start the speaker and save the AAC audio to a file named `audio.aac`." + ``` + $ bumble-speaker --output audio.aac usb:0 + ``` + +!!! example "Start the speaker and save the SBC audio to a file named `audio.sbc`." + ``` + $ bumble-speaker --codec sbc --output audio.sbc usb:0 + ``` + +!!! example "Start the speaker and connect it to a phone at address `B8:7B:C5:05:57:ED`." + ``` + $ bumble-speaker --connect B8:7B:C5:05:57:ED usb:0 + ``` + diff --git a/docs/mkdocs/src/images/speaker_screenshot.png b/docs/mkdocs/src/images/speaker_screenshot.png new file mode 100644 index 00000000..fd34880e Binary files /dev/null and b/docs/mkdocs/src/images/speaker_screenshot.png differ diff --git a/examples/run_classic_connect.py b/examples/run_classic_connect.py index bb46bf75..3ae6ed8a 100644 --- a/examples/run_classic_connect.py +++ b/examples/run_classic_connect.py @@ -23,7 +23,7 @@ from bumble.colors import color from bumble.device import Device from bumble.transport import open_transport_or_link -from bumble.core import BT_BR_EDR_TRANSPORT, BT_L2CAP_PROTOCOL_ID +from bumble.core import BT_BR_EDR_TRANSPORT, BT_L2CAP_PROTOCOL_ID, CommandTimeoutError from bumble.sdp import ( Client as SDP_Client, SDP_PUBLIC_BROWSE_ROOT, @@ -48,62 +48,70 @@ async def main(): # Create a device device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) device.classic_enabled = True + device.le_enabled = False await device.power_on() - async def connect(target_address): - print(f'=== Connecting to {target_address}...') - connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT) - print(f'=== Connected to {connection.peer_address}!') - - # Connect to the SDP Server - sdp_client = SDP_Client(device) - await sdp_client.connect(connection) - - # List all services in the root browse group - service_record_handles = await sdp_client.search_services( - [SDP_PUBLIC_BROWSE_ROOT] - ) - print(color('\n==================================', 'blue')) - print(color('SERVICES:', 'yellow'), service_record_handles) - - # For each service in the root browse group, get all its attributes - for service_record_handle in service_record_handles: - attributes = await sdp_client.get_attributes( - service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE] - ) - print(color(f'SERVICE {service_record_handle:04X} attributes:', 'yellow')) - for attribute in attributes: - print(' ', attribute.to_string(with_colors=True)) - - # Search for services with an L2CAP service attribute - search_result = await sdp_client.search_attributes( - [BT_L2CAP_PROTOCOL_ID], [SDP_ALL_ATTRIBUTES_RANGE] - ) - print(color('\n==================================', 'blue')) - print(color('SEARCH RESULTS:', 'yellow')) - for attribute_list in search_result: - print(color('SERVICE:', 'green')) - print( - ' ' - + '\n '.join( - [ - attribute.to_string(with_colors=True) - for attribute in attribute_list - ] + async def connect(target_address): + print(f'=== Connecting to {target_address}...') + try: + connection = await device.connect( + target_address, transport=BT_BR_EDR_TRANSPORT ) + except CommandTimeoutError: + print('!!! Connection timed out') + return + print(f'=== Connected to {connection.peer_address}!') + + # Connect to the SDP Server + sdp_client = SDP_Client(device) + await sdp_client.connect(connection) + + # List all services in the root browse group + service_record_handles = await sdp_client.search_services( + [SDP_PUBLIC_BROWSE_ROOT] ) + print(color('\n==================================', 'blue')) + print(color('SERVICES:', 'yellow'), service_record_handles) - await sdp_client.disconnect() - await hci_source.wait_for_termination() + # For each service in the root browse group, get all its attributes + for service_record_handle in service_record_handles: + attributes = await sdp_client.get_attributes( + service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE] + ) + print( + color(f'SERVICE {service_record_handle:04X} attributes:', 'yellow') + ) + for attribute in attributes: + print(' ', attribute.to_string(with_colors=True)) - # Connect to a peer - target_addresses = sys.argv[3:] - await asyncio.wait( - [ - asyncio.create_task(connect(target_address)) - for target_address in target_addresses - ] - ) + # Search for services with an L2CAP service attribute + search_result = await sdp_client.search_attributes( + [BT_L2CAP_PROTOCOL_ID], [SDP_ALL_ATTRIBUTES_RANGE] + ) + print(color('\n==================================', 'blue')) + print(color('SEARCH RESULTS:', 'yellow')) + for attribute_list in search_result: + print(color('SERVICE:', 'green')) + print( + ' ' + + '\n '.join( + [ + attribute.to_string(with_colors=True) + for attribute in attribute_list + ] + ) + ) + + await sdp_client.disconnect() + + # Connect to a peer + target_addresses = sys.argv[3:] + await asyncio.wait( + [ + asyncio.create_task(connect(target_address)) + for target_address in target_addresses + ] + ) # ----------------------------------------------------------------------------- diff --git a/examples/speaker.json b/examples/speaker.json new file mode 100644 index 00000000..61ce80d4 --- /dev/null +++ b/examples/speaker.json @@ -0,0 +1,5 @@ +{ + "name": "Bumble Speaker", + "class_of_device": 2360324, + "keystore": "JsonKeyStore" +} diff --git a/setup.cfg b/setup.cfg index 45c72645..d7745687 100644 --- a/setup.cfg +++ b/setup.cfg @@ -30,22 +30,23 @@ package_dir = bumble.apps = apps include-package-data = True install_requires = + aiohttp >= 3.8.4; platform_system!='Emscripten' appdirs >= 1.4 + bt-test-interfaces >= 0.0.2 click >= 7.1.2; platform_system!='Emscripten' cryptography == 35; platform_system!='Emscripten' grpcio == 1.51.1; platform_system!='Emscripten' + humanize >= 4.6.0 libusb1 >= 2.0.1; platform_system!='Emscripten' libusb-package == 1.0.26.1; platform_system!='Emscripten' prompt_toolkit >= 3.0.16; platform_system!='Emscripten' + prettytable >= 3.6.0 protobuf >= 3.12.4 pyee >= 8.2.2 pyserial-asyncio >= 0.5; platform_system!='Emscripten' pyserial >= 3.5; platform_system!='Emscripten' pyusb >= 1.2; platform_system!='Emscripten' websockets >= 8.1; platform_system!='Emscripten' - prettytable >= 3.6.0 - humanize >= 4.6.0 - bt-test-interfaces >= 0.0.2 [options.entry_points] console_scripts = @@ -61,6 +62,7 @@ console_scripts = bumble-usb-probe = bumble.apps.usb_probe:main bumble-link-relay = bumble.apps.link_relay.link_relay:main bumble-bench = bumble.apps.bench:main + bumble-speaker = bumble.apps.speaker.speaker:main bumble-pandora-server = bumble.apps.pandora_server:main [options.package_data] diff --git a/speaker.html b/speaker.html new file mode 100644 index 00000000..05cc31f8 --- /dev/null +++ b/speaker.html @@ -0,0 +1,28 @@ + + + + Audio WAV Player + + +

Audio WAV Player

+ + + + + diff --git a/tests/codecs_test.py b/tests/codecs_test.py new file mode 100644 index 00000000..b8affada --- /dev/null +++ b/tests/codecs_test.py @@ -0,0 +1,67 @@ +# Copyright 2021-2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +import pytest +from bumble.codecs import AacAudioRtpPacket, BitReader + + +# ----------------------------------------------------------------------------- +def test_reader(): + reader = BitReader(b'') + with pytest.raises(ValueError): + reader.read(1) + + reader = BitReader(b'hello') + with pytest.raises(ValueError): + reader.read(40) + + reader = BitReader(bytes([0xFF])) + assert reader.read(1) == 1 + with pytest.raises(ValueError): + reader.read(10) + + reader = BitReader(bytes([0x78])) + value = 0 + for _ in range(8): + value = (value << 1) | reader.read(1) + assert value == 0x78 + + data = bytes([x & 0xFF for x in range(66 * 100)]) + reader = BitReader(data) + value = 0 + for _ in range(100): + for bits in range(1, 33): + value = value << bits | reader.read(bits) + assert value == int.from_bytes(data, byteorder='big') + + +def test_aac_rtp(): + # pylint: disable=line-too-long + packet_data = bytes.fromhex( + '47fc0000b090800300202066000198000de120000000000000000000000000000000000000000000001c' + ) + packet = AacAudioRtpPacket(packet_data) + adts = packet.to_adts() + assert adts == bytes.fromhex( + 'fff1508004fffc2066000198000de120000000000000000000000000000000000000000000001c' + ) + + +# ----------------------------------------------------------------------------- +if __name__ == '__main__': + test_reader() + test_aac_rtp()