diff --git a/apps/speaker/logo.svg b/apps/speaker/logo.svg new file mode 100644 index 00000000..70ef7a90 --- /dev/null +++ b/apps/speaker/logo.svg @@ -0,0 +1,42 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/apps/speaker/speaker.css b/apps/speaker/speaker.css new file mode 100644 index 00000000..2d3bcfbf --- /dev/null +++ b/apps/speaker/speaker.css @@ -0,0 +1,66 @@ +body, h1, h2, h3, h4, h5, h6 { + font-family: sans-serif; +} + +#controlsDiv { + margin: 6px; +} + +#connectionText { + background-color: rgb(239, 89, 75); + border: none; + border-radius: 4px; + padding: 8px; + display: inline-block; + margin: 4px; +} + +#startButton { + padding: 4px; + margin: 6px; +} + +#fftCanvas { + border-radius: 16px; + margin: 6px; +} + +#bandwidthCanvas { + border: grey; + border-style: solid; + border-radius: 8px; + margin: 6px; +} + +#streamStateText { + background-color: rgb(93, 165, 93); + border: none; + border-radius: 8px; + padding: 10px 20px; + display: inline-block; + margin: 6px; +} + +#propertiesTable { + border: grey; + border-style: solid; + border-radius: 4px; + padding: 4px; + margin: 6px; +} + +th, td { + padding-left: 8px; + padding-right: 8px; +} + +.properties td:nth-child(even) { + background-color: #D6EEEE; + font-family: monospace; +} + +.properties td:nth-child(odd) { + font-weight: bold; +} + +.properties tr td:nth-child(2) { width: 150px; } \ No newline at end of file diff --git a/apps/speaker/speaker.html b/apps/speaker/speaker.html new file mode 100644 index 00000000..4786aabe --- /dev/null +++ b/apps/speaker/speaker.html @@ -0,0 +1,33 @@ + + + + Bumble Speaker + + + + +

Bumble Virtual Speaker

+
+
+ + + +
+ + + + +
Codec
Packets
Bytes
+
+ Bandwidth Graph +
+ IDLE +
+ + +
+ Audio Frequencies Animation + +
+ + \ No newline at end of file diff --git a/apps/speaker/speaker.js b/apps/speaker/speaker.js new file mode 100644 index 00000000..68094d46 --- /dev/null +++ b/apps/speaker/speaker.js @@ -0,0 +1,297 @@ +(function () { + 'use strict'; + +const channelUrl = ((window.location.protocol === "https:") ? "wss://" : "ws://") + window.location.host + "/channel"; +let channelSocket; +let connectionText; +let codecText; +let packetsReceivedText; +let bytesReceivedText; +let streamStateText; +let controlsDiv; +let audioOnButton; +let mediaSource; +let sourceBuffer; +let audioElement; +let audioContext; +let audioAnalyzer; +let audioFrequencyBinCount; +let audioFrequencyData; +let packetsReceived = 0; +let bytesReceived = 0; +let audioState = "stopped"; +let streamState = "IDLE"; +let audioSupportMessageText; +let fftCanvas; +let fftCanvasContext; +let bandwidthCanvas; +let bandwidthCanvasContext; +let bandwidthBinCount; +let bandwidthBins; + +const FFT_WIDTH = 800; +const FFT_HEIGHT = 256; +const BANDWIDTH_WIDTH = 500; +const BANDWIDTH_HEIGHT = 100; + +function hexToBytes(hex) { + return Uint8Array.from(hex.match(/.{1,2}/g).map((byte) => parseInt(byte, 16))); +} + +function init() { + initUI(); + initMediaSource(); + initAudioElement(); + initAnalyzer(); + + connect(); +} + +function initUI() { + controlsDiv = document.getElementById("controlsDiv"); + controlsDiv.style.visibility = "hidden"; + connectionText = document.getElementById("connectionText"); + audioOnButton = document.getElementById("audioOnButton"); + codecText = document.getElementById("codecText"); + packetsReceivedText = document.getElementById("packetsReceivedText"); + bytesReceivedText = document.getElementById("bytesReceivedText"); + streamStateText = document.getElementById("streamStateText"); + audioSupportMessageText = document.getElementById("audioSupportMessageText"); + + audioOnButton.onclick = () => startAudio(); + + setConnectionText(""); +} + +function initMediaSource() { + mediaSource = new MediaSource(); + mediaSource.onsourceopen = onMediaSourceOpen; + mediaSource.onsourceclose = onMediaSourceClose; + mediaSource.onsourceended = onMediaSourceEnd; +} + +function initAudioElement() { + audioElement = document.getElementById("audio"); + audioElement.src = URL.createObjectURL(mediaSource); + // audioElement.controls = true; +} + +function initAnalyzer() { + fftCanvas = document.getElementById("fftCanvas"); + fftCanvas.width = FFT_WIDTH + fftCanvas.height = FFT_HEIGHT + fftCanvasContext = fftCanvas.getContext('2d'); + fftCanvasContext.fillStyle = "rgb(0, 0, 0)"; + fftCanvasContext.fillRect(0, 0, FFT_WIDTH, FFT_HEIGHT); + + bandwidthCanvas = document.getElementById("bandwidthCanvas"); + bandwidthCanvas.width = BANDWIDTH_WIDTH + bandwidthCanvas.height = BANDWIDTH_HEIGHT + bandwidthCanvasContext = bandwidthCanvas.getContext('2d'); + bandwidthCanvasContext.fillStyle = "rgb(255, 255, 255)"; + bandwidthCanvasContext.fillRect(0, 0, BANDWIDTH_WIDTH, BANDWIDTH_HEIGHT); +} + +function startAnalyzer() { + // FFT + audioContext = new AudioContext(); + audioAnalyzer = audioContext.createAnalyser(); + audioAnalyzer.fftSize = 128; + audioFrequencyBinCount = audioAnalyzer.frequencyBinCount; + audioFrequencyData = new Uint8Array(audioFrequencyBinCount); + const stream = audioElement.captureStream(); + const source = audioContext.createMediaStreamSource(stream); + source.connect(audioAnalyzer); + + // Bandwidth + bandwidthBinCount = BANDWIDTH_WIDTH / 2; + bandwidthBins = []; + + requestAnimationFrame(onAnimationFrame); +} + +function setConnectionText(message) { + connectionText.innerText = message; + if (message.length == 0) { + connectionText.style.display = "none"; + } else { + connectionText.style.display = "inline-block"; + } +} + +function onAnimationFrame() { + // FFT + audioAnalyzer.getByteFrequencyData(audioFrequencyData); + fftCanvasContext.fillStyle = "rgb(0, 0, 0)"; + fftCanvasContext.fillRect(0, 0, FFT_WIDTH, FFT_HEIGHT); + const barCount = audioFrequencyBinCount; + const barWidth = (FFT_WIDTH / audioFrequencyBinCount) - 1; + for (let bar = 0; bar < barCount; bar++) { + const barHeight = audioFrequencyData[bar]; + fftCanvasContext.fillStyle = `rgb(${barHeight / 256 * 200 + 50}, 50, ${50 + 2 * bar})`; + fftCanvasContext.fillRect(bar * (barWidth + 1), FFT_HEIGHT - barHeight, barWidth, barHeight); + } + + // Bandwidth + bandwidthCanvasContext.fillStyle = "rgb(255, 255, 255)"; + bandwidthCanvasContext.fillRect(0, 0, BANDWIDTH_WIDTH, BANDWIDTH_HEIGHT); + bandwidthCanvasContext.fillStyle = `rgb(100, 100, 100)`; + for (let t = 0; t < bandwidthBins.length; t++) { + const lineHeight = (bandwidthBins[t] / 1000) * BANDWIDTH_HEIGHT; + bandwidthCanvasContext.fillRect(t * 2, BANDWIDTH_HEIGHT - lineHeight, 2, lineHeight); + } + + // Display again at the next frame + requestAnimationFrame(onAnimationFrame); +} + +function onMediaSourceOpen() { + console.log(this.readyState); + sourceBuffer = mediaSource.addSourceBuffer("audio/aac"); +} + +function onMediaSourceClose() { + console.log(this.readyState); +} + +function onMediaSourceEnd() { + console.log(this.readyState); +} + +async function startAudio() { + try { + console.log("starting audio..."); + audioOnButton.disabled = true; + audioState = "starting"; + await audioElement.play(); + console.log("audio started"); + audioState = "playing"; + startAnalyzer(); + } catch(error) { + console.error(`play failed: ${error}`); + audioState = "stopped"; + audioOnButton.disabled = false; + } +} + +function onAudioPacket(packet) { + if (audioState == "stopped") { + // Drop the packet, we're not ready to play. + return; + } + + // Queue the audio packet. + sourceBuffer.appendBuffer(packet); + + packetsReceived += 1; + packetsReceivedText.innerText = packetsReceived; + bytesReceived += packet.byteLength; + bytesReceivedText.innerText = bytesReceived; + + bandwidthBins[bandwidthBins.length] = packet.byteLength; + if (bandwidthBins.length > bandwidthBinCount) { + bandwidthBins.shift(); + } +} + +function onChannelOpen() { + console.log('channel OPEN'); + setConnectionText(""); + controlsDiv.style.visibility = "visible"; + + // Handshake with the backend. + sendMessage({ + type: "hello" + }); +} + +function onChannelClose() { + console.log('channel CLOSED'); + setConnectionText("Connection to CLI app closed, restart it and reload this page."); + controlsDiv.style.visibility = "hidden"; +} + +function onChannelError(error) { + console.log(`channel ERROR: ${error}`); + setConnectionText(`Connection to CLI app error ({${error}}), restart it and reload this page.`); + controlsDiv.style.visibility = "hidden"; +} + +function onChannelMessage(message) { + if (typeof message.data === 'string' || message.data instanceof String) { + // JSON message. + const jsonMessage = JSON.parse(message.data); + console.log(`channel MESSAGE: ${message.data}`); + + // Dispatch the message. + const handlerName = `on${jsonMessage.type.charAt(0).toUpperCase()}${jsonMessage.type.slice(1)}Message` + const handler = messageHandlers[handlerName]; + if (handler !== undefined) { + const params = jsonMessage.params; + if (params === undefined) { + params = {}; + } + handler(params); + } else { + console.warn(`unhandled message: ${jsonMessage.type}`) + } + } else { + // BINARY audio data. + onAudioPacket(message.data); + } +} + +function onHelloMessage(params) { + codecText.innerText = params.codec; + if (params.codec != "aac") { + audioOnButton.disabled = true; + audioSupportMessageText.innerText = "Only AAC can be played, audio will be disabled"; + audioSupportMessageText.style.display = "inline-block"; + } else { + audioSupportMessageText.innerText = ""; + audioSupportMessageText.style.display = "none"; + } +} + +function onStartMessage(params) { + streamState = "STARTED"; + streamStateText.innerText = streamState; +} + +function onStopMessage(params) { + streamState = "STOPPED"; + streamStateText.innerText = streamState; +} + +function onSuspendMessage(params) { + streamState = "SUSPENDED"; + streamStateText.innerText = streamState; +} + +function sendMessage(message) { + channelSocket.send(JSON.stringify(message)); +} + +function connect() { + console.log("connecting to CLI app"); + + channelSocket = new WebSocket(channelUrl); + channelSocket.binaryType = "arraybuffer"; + channelSocket.onopen = onChannelOpen; + channelSocket.onclose = onChannelClose; + channelSocket.onerror = onChannelError; + channelSocket.onmessage = onChannelMessage; +} + +const messageHandlers = { + onHelloMessage, + onStartMessage, + onStopMessage, + onSuspendMessage +} + +window.onload = (event) => { + init(); +} + +}()); \ No newline at end of file diff --git a/apps/speaker/speaker.py b/apps/speaker/speaker.py index 67afc3d0..3bf17ddc 100644 --- a/apps/speaker/speaker.py +++ b/apps/speaker/speaker.py @@ -15,19 +15,32 @@ # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- +from __future__ import annotations import asyncio +import asyncio.subprocess +from importlib import resources +import json import os import logging +import pathlib +from typing import Dict, List, Optional +import weakref import click -from bumble.core import BT_BR_EDR_TRANSPORT +import aiohttp +from aiohttp import web +import bumble +from bumble.colors import color +from bumble.core import BT_BR_EDR_TRANSPORT from bumble.device import Device, DeviceConfiguration +from bumble.sdp import ServiceAttribute from bumble.transport import open_transport from bumble.avdtp import ( AVDTP_AUDIO_MEDIA_TYPE, Listener, MediaCodecCapabilities, + MediaPacket, Protocol, ) from bumble.a2dp import ( @@ -42,22 +55,323 @@ from bumble.a2dp import ( SBC_STEREO_CHANNEL_MODE, SBC_JOINT_STEREO_CHANNEL_MODE, SbcMediaCodecInformation, - AacMediaCodecInformation + AacMediaCodecInformation, ) from bumble.utils import AsyncRunner +from bumble.codecs import AacAudioRtpPacket + + +# ----------------------------------------------------------------------------- +# Constants +# ----------------------------------------------------------------------------- +DEFAULT_UI_PORT = 7654 + +# ----------------------------------------------------------------------------- +class AudioExtractor: + @staticmethod + def create(codec: str): + if codec == 'aac': + return AacAudioExtractor() + if codec == 'sbc': + return SbcAudioExtractor() + + def extract_audio(self, packet: MediaPacket) -> bytes: + raise NotImplementedError() + + +# ----------------------------------------------------------------------------- +class AacAudioExtractor: + def extract_audio(self, packet: MediaPacket) -> bytes: + return AacAudioRtpPacket(packet.payload).to_adts() + + +# ----------------------------------------------------------------------------- +class SbcAudioExtractor: + def extract_audio(self, packet: MediaPacket) -> bytes: + # header = packet.payload[0] + # fragmented = header >> 7 + # start = (header >> 6) & 0x01 + # last = (header >> 5) & 0x01 + # number_of_frames = header & 0x0F + + # TODO: support fragmented payloads + return packet.payload[1:] + + +# ----------------------------------------------------------------------------- +class Output: + async def start(self): + pass + + async def stop(self): + pass + + async def suspend(self): + pass + + +# ----------------------------------------------------------------------------- +class FileOutput(Output): + filename: str + codec: str + extractor: AudioExtractor + + def __init__(self, filename, codec): + self.filename = filename + self.codec = codec + self.file = open(filename, 'wb') + self.extractor = AudioExtractor.create(codec) + + def on_rtp_packet(self, packet: MediaPacket) -> None: + self.file.write(self.extractor.extract_audio(packet)) + + +# ----------------------------------------------------------------------------- +class QueuedOutput(Output): + MAX_QUEUE_SIZE = 32768 + + packets: asyncio.Queue + extractor: AudioExtractor + packet_pump_task: Optional[asyncio.Task] + started: bool + + def __init__(self, extractor): + self.extractor = extractor + self.packets = asyncio.Queue() + self.packet_pump_task = None + self.started = False + + async def start(self): + if self.started: + return + + self.packet_pump_task = asyncio.create_task(self.pump_packets()) + + async def pump_packets(self): + while True: + packet = await self.packets.get() + await self.on_audio_packet(packet) + + async def on_audio_packet(self, packet: bytes) -> None: + pass + + def on_rtp_packet(self, packet: MediaPacket) -> None: + if self.packets.qsize() > self.MAX_QUEUE_SIZE: + print("queue full, dropping") + return + + self.packets.put_nowait(self.extractor.extract_audio(packet)) + + +# ----------------------------------------------------------------------------- +class WebSocketOutput(QueuedOutput): + def __init__(self, codec, send_audio, send_message): + super().__init__(AudioExtractor.create(codec)) + self.send_audio = send_audio + self.send_message = send_message + + async def on_audio_packet(self, packet: bytes) -> None: + await self.send_audio(packet) + + async def start(self): + await super().start() + await self.send_message('start') + + async def stop(self): + await super().stop() + await self.send_message('stop') + + async def suspend(self): + await super().suspend() + await self.send_message('suspend') + + +# ----------------------------------------------------------------------------- +class FfplayOutput(QueuedOutput): + MAX_QUEUE_SIZE = 32768 + + subprocess: Optional[asyncio.subprocess.Process] + ffplay_task: Optional[asyncio.Task] + + def __init__(self) -> None: + super().__init__(AacAudioExtractor()) + self.subprocess = None + self.ffplay_task = None + + async def start(self): + if self.started: + return + + super().start() + + self.subprocess = await asyncio.create_subprocess_shell( + 'ffplay -acodec aac pipe:0', + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + self.ffplay_task = asyncio.create_task(self.monitor_ffplay()) + + async def stop(self): + # TODO + pass + + async def suspend(self): + # TODO + pass + + async def monitor_ffplay(self): + async def read_stream(name, stream): + while True: + data = await stream.read() + print(f'{name}:', data) + + await asyncio.wait( + [ + asyncio.create_task( + read_stream('[ffplay stdout]', self.subprocess.stdout) + ), + asyncio.create_task( + read_stream('[ffplay stderr]', self.subprocess.stderr) + ), + asyncio.create_task(self.subprocess.wait()), + ] + ) + print("FFPLAY done") + + async def on_audio_packet(self, packet): + try: + self.subprocess.stdin.write(packet) + except Exception: + print('!!!! exception while sending audio to ffplay pipe') + + +# ----------------------------------------------------------------------------- +class UiServer: + speaker: Speaker + port: int + + def __init__(self, speaker: Speaker, port: int) -> None: + self.speaker = weakref.ref(speaker) + self.port = port + self.channel_socket = None + + async def start_http(self) -> None: + """Start the UI HTTP server.""" + + app = web.Application() + app.add_routes( + [ + web.get('/', self.get_static), + web.get('/speaker.html', self.get_static), + web.get('/speaker.js', self.get_static), + web.get('/speaker.css', self.get_static), + web.get('/logo.svg', self.get_static), + web.get('/channel', self.get_channel), + ] + ) + + runner = web.AppRunner(app) + await runner.setup() + site = web.TCPSite(runner, 'localhost', self.port) + print('UI HTTP server at ' + color(f'http://127.0.0.1:{self.port}', 'green')) + await site.start() + + async def get_static(self, request): + path = request.path + if path == '/': + path = '/speaker.html' + if path.endswith('.html'): + content_type = 'text/html' + elif path.endswith('.js'): + content_type = 'text/javascript' + elif path.endswith('.css'): + content_type = 'text/css' + elif path.endswith('.svg'): + content_type = 'image/svg+xml' + else: + content_type = 'text/plain' + text = ( + resources.files("bumble.apps.speaker") + .joinpath(pathlib.Path(path).relative_to('/')) + .read_text(encoding="utf-8") + ) + return aiohttp.web.Response(text=text, content_type=content_type) + + async def get_channel(self, request): + ws = web.WebSocketResponse() + await ws.prepare(request) + + # Process messages until the socket is closed. + self.channel_socket = ws + async for message in ws: + if message.type == aiohttp.WSMsgType.TEXT: + print(f'<<< received message: {message.data}') + await self.on_message(message.data) + elif message.type == aiohttp.WSMsgType.ERROR: + print(f'channel connection closed with exception {ws.exception()}') + + self.channel_socket = None + print('--- channel connection closed') + + return ws + + async def on_message(self, message_str: str): + # Parse the message as JSON + message = json.loads(message_str) + + # Dispatch the message + message_type = message['type'] + message_params = message.get('params', {}) + handler = getattr(self, f'on_{message_type}_message') + if handler: + await handler(**message_params) + + async def on_hello_message(self): + print('HELLO') + await self.send_message( + 'hello', bumble_version=bumble.__version__, codec=self.speaker().codec + ) + + async def send_message(self, message_type: str, **kwargs) -> None: + if self.channel_socket is None: + return + + message = {'type': message_type, 'params': kwargs} + await self.channel_socket.send_json(message) + + async def send_audio(self, data: bytes) -> None: + if self.channel_socket is None: + return + + await self.channel_socket.send_bytes(data) # ----------------------------------------------------------------------------- class Speaker: - def __init__(self, transport, discover): + def __init__(self, transport, codec, discover, outputs, ui_port): self.transport = transport + self.codec = codec self.discover = discover + self.ui_port = ui_port self.device = None self.listener = None - self.output_filename = 'speaker_output.sbc' - self.output = None + self.packets_received = 0 + self.bytes_received = 0 + self.outputs = [] + for output in outputs: + if output == '@ffplay': + self.outputs.append(FfplayOutput()) + continue - def sdp_records(self): + # Default to FileOutput + self.outputs.append(FileOutput(output, codec)) + + # Create an HTTP server for the UI + self.ui_server = UiServer(speaker=self, port=ui_port) + + def sdp_records(self) -> Dict[int, List[ServiceAttribute]]: service_record_handle = 0x00010001 return { service_record_handle: make_audio_sink_service_sdp_records( @@ -65,23 +379,29 @@ class Speaker: ) } - def codec_capabilities(self): - return self.aac_codec_capabilities() + def codec_capabilities(self) -> MediaCodecCapabilities: + if self.codec == 'aac': + return self.aac_codec_capabilities() - def aac_codec_capabilities(self): + if self.codec == 'sbc': + return self.sbc_codec_capabilities() + + raise RuntimeError('unsupported codec') + + def aac_codec_capabilities(self) -> MediaCodecCapabilities: return MediaCodecCapabilities( media_type=AVDTP_AUDIO_MEDIA_TYPE, media_codec_type=A2DP_MPEG_2_4_AAC_CODEC_TYPE, media_codec_information=AacMediaCodecInformation.from_lists( object_types=[MPEG_2_AAC_LC_OBJECT_TYPE], sampling_frequencies=[48000, 44100], - channels=[1,2], + channels=[1, 2], vbr=1, - bitrate=256000 - ) + bitrate=256000, + ), ) - def sbc_codec_capabilities(self): + def sbc_codec_capabilities(self) -> MediaCodecCapabilities: return MediaCodecCapabilities( media_type=AVDTP_AUDIO_MEDIA_TYPE, media_codec_type=A2DP_SBC_CODEC_TYPE, @@ -137,12 +457,18 @@ class Speaker: def on_sink_start(self): print("Sink Start") + for output in self.outputs: + AsyncRunner.spawn(output.start()) def on_sink_stop(self): print("Sink Stop") + for output in self.outputs: + AsyncRunner.spawn(output.stop()) def on_sink_suspend(self): print("Sink Suspend") + for output in self.outputs: + AsyncRunner.spawn(output.suspend()) def on_sink_configuration(self, config): print("Sink Configuration:") @@ -155,21 +481,15 @@ class Speaker: print("RTP Channel Closed") def on_rtp_packet(self, packet): - # header = packet.payload[0] - # fragmented = header >> 7 - # # start = (header >> 6) & 0x01 - # # last = (header >> 5) & 0x01 - # number_of_frames = header & 0x0F + self.packets_received += 1 + self.bytes_received += len(packet.payload) + print( + f'[{self.bytes_received} bytes in {self.packets_received} packets] {packet}', + end='\r', + ) - # payload = packet.payload[1:] - # payload_size = len(payload) - # if fragmented: - # print(f'RTP: fragment {payload_size} bytes in {number_of_frames} frames') - # else: - # print(f'RTP: {payload_size} bytes in {number_of_frames} frames') - print(packet.payload.hex()) - - self.output.write(packet.payload) + for output in self.outputs: + output.on_rtp_packet(packet) async def advertise(self): await self.device.set_discoverable(True) @@ -178,9 +498,7 @@ class Speaker: async def connect(self, address): # Connect to the source print(f'=== Connecting to {address}...') - connection = await self.device.connect( - address, transport=BT_BR_EDR_TRANSPORT - ) + connection = await self.device.connect(address, transport=BT_BR_EDR_TRANSPORT) print(f'=== Connected to {connection.peer_address}') self.on_bluetooth_connection(connection) @@ -205,71 +523,117 @@ class Speaker: print('@@@', endpoint) async def run(self, connect_address): + print(f'Speaker ready to play, codec={color(self.codec, "cyan")}') + await self.ui_server.start_http() + self.outputs.append( + WebSocketOutput( + self.codec, self.ui_server.send_audio, self.ui_server.send_message + ) + ) + async with await open_transport(self.transport) as (hci_source, hci_sink): - with open(self.output_filename, 'wb') as sbc_file: - self.output = sbc_file + # Create a device + device_config = DeviceConfiguration() + device_config.name = "Bumble Speaker" + device_config.class_of_device = 0x240404 + device_config.keystore = "JsonKeyStore" + device_config.classic_enabled = True + device_config.le_enabled = False + self.device = Device.from_config_with_hci( + device_config, hci_source, hci_sink + ) - # Create a device - device_config = DeviceConfiguration() - device_config.name = "Bumble Speaker" - device_config.class_of_device = 2360324 - device_config.keystore = "JsonKeyStore" - device_config.classic_enabled = True - device_config.le_enabled = False - self.device = Device.from_config_with_hci( - device_config, hci_source, hci_sink - ) + # Setup the SDP to expose the sink service + self.device.sdp_service_records = self.sdp_records() - # Setup the SDP to expose the sink service - self.device.sdp_service_records = self.sdp_records() + # Start the controller + await self.device.power_on() - # Start the controller - await self.device.power_on() + # Listen for Bluetooth connections + self.device.on('connection', self.on_bluetooth_connection) - # Listen for Bluetooth connections - self.device.on('connection', self.on_bluetooth_connection); + # Create a listener to wait for AVDTP connections + self.listener = Listener(Listener.create_registrar(self.device)) + self.listener.on('connection', self.on_avdtp_connection) - # Create a listener to wait for AVDTP connections - self.listener = Listener(Listener.create_registrar(self.device)) - self.listener.on('connection', self.on_avdtp_connection) + if connect_address: + # Connect to the source + await self.connect(connect_address) + else: + # Start being discoverable and connectable + print("Waiting for connection...") + await self.advertise() - if connect_address: - # Connect to the source - await self.connect(connect_address) - else: - # Start being discoverable and connectable - await self.advertise() + await hci_source.wait_for_termination() - await hci_source.wait_for_termination() + for output in self.outputs: + await output.stop() # ----------------------------------------------------------------------------- @click.group() @click.option('--device-config', metavar='FILENAME', help='Device configuration file') @click.pass_context -def speaker(ctx, device_config): +def speaker_cli(ctx, device_config): ctx.ensure_object(dict) ctx.obj['device_config'] = device_config -@speaker.command() +@speaker_cli.command() @click.argument('transport') +@click.option( + '--codec', type=click.Choice(['sbc', 'aac']), default='aac', show_default=True +) @click.option( '--connect', 'connect_address', metavar='ADDRESS_OR_NAME', help='Address or name to connect to', ) -@click.option('--discover', is_flag=True) +@click.option( + '--discover', is_flag=True, help='Discover remote endpoints once connected' +) +@click.option( + '--output', + multiple=True, + metavar='NAME', + help=( + 'Send audio to this named output ' + '(may be used more than once for multiple outputs)' + ), +) +@click.option( + '--ui-port', + 'ui_port', + metavar='HTTP_PORT', + default=DEFAULT_UI_PORT, + show_default=True, + help='HTTP port for the UI server', +) @click.pass_context -def play(ctx, transport, connect_address, discover): - asyncio.run(Speaker(transport, discover).run(connect_address)) +def play(ctx, transport, codec, connect_address, discover, output, ui_port): + """Run the speaker in playback mode.""" + + # ffplay only works with AAC for now + if codec != 'aac' and '@ffplay' in output: + print( + color( + f'{codec} not supported with @ffplay output, ' + '@ffplay output will be skipped', + 'yellow', + ) + ) + output = list(filter(lambda x: x != '@ffplay', output)) + + asyncio.run( + Speaker(transport, codec, discover, output, ui_port).run(connect_address) + ) # ----------------------------------------------------------------------------- def main(): logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) - speaker() + speaker_cli() # ----------------------------------------------------------------------------- diff --git a/bumble/codecs.py b/bumble/codecs.py new file mode 100644 index 00000000..1d7ae82c --- /dev/null +++ b/bumble/codecs.py @@ -0,0 +1,381 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +from __future__ import annotations +from dataclasses import dataclass + + +# ----------------------------------------------------------------------------- +class BitReader: + """Simple but not optimized bit stream reader.""" + + data: bytes + bytes_position: int + bit_position: int + cache: int + bits_cached: int + + def __init__(self, data: bytes): + self.data = data + self.byte_position = 0 + self.bit_position = 0 + self.cache = 0 + self.bits_cached = 0 + + def read(self, bits: int) -> int: + """ "Read up to 32 bits.""" + + if bits > 32: + raise ValueError('maximum read size is 32') + + if self.bits_cached >= bits: + # We have enough bits. + self.bits_cached -= bits + self.bit_position += bits + return (self.cache >> self.bits_cached) & ((1 << bits) - 1) + + # Read more cache, up to 32 bits + feed_bytes = self.data[self.byte_position : self.byte_position + 4] + feed_size = len(feed_bytes) + feed_int = int.from_bytes(feed_bytes, byteorder='big') + if 8 * feed_size + self.bits_cached < bits: + raise ValueError('trying to read past the data') + self.byte_position += feed_size + + # Combine the new cache and the old cache + cache = self.cache & ((1 << self.bits_cached) - 1) + new_bits = bits - self.bits_cached + self.bits_cached = 8 * feed_size - new_bits + result = (feed_int >> self.bits_cached) | (cache << new_bits) + self.cache = feed_int + + self.bit_position += bits + return result + + def read_bytes(self, count: int): + if self.bit_position + 8 * count > 8 * len(self.data): + raise ValueError('not enough data') + + if self.bit_position % 8: + # Not byte aligned + result = bytearray(count) + for i in range(count): + result[i] = self.read(8) + return bytes(result) + + # Byte aligned + self.byte_position = self.bit_position // 8 + self.bits_cached = 0 + self.cache = 0 + offset = self.bit_position // 8 + self.bit_position += 8 * count + return self.data[offset : offset + count] + + def bits_left(self) -> int: + return (8 * len(self.data)) - self.bit_position + + def skip(self, bits: int) -> None: + # Slow, but simple... + while bits: + if bits > 32: + self.read(32) + bits -= 32 + else: + self.read(bits) + break + + +# ----------------------------------------------------------------------------- +class AacAudioRtpPacket: + """AAC payload encapsulated in an RTP packet payload""" + + @staticmethod + def latm_value(reader: BitReader) -> int: + bytes_for_value = reader.read(2) + value = 0 + for _ in range(bytes_for_value + 1): + value = value * 256 + reader.read(8) + return value + + @staticmethod + def program_config_element(reader: BitReader): + raise ValueError('program_config_element not supported') + + @dataclass + class GASpecificConfig: + def __init__( + self, reader: BitReader, channel_configuration: int, audio_object_type: int + ) -> None: + # GASpecificConfig - ISO/EIC 14496-3 Table 4.1 + frame_length_flag = reader.read(1) + depends_on_core_coder = reader.read(1) + if depends_on_core_coder: + self.core_coder_delay = reader.read(14) + extension_flag = reader.read(1) + if not channel_configuration: + AacAudioRtpPacket.program_config_element(reader) + if audio_object_type in (6, 20): + self.layer_nr = reader.read(3) + if extension_flag: + if audio_object_type == 22: + num_of_sub_frame = reader.read(5) + layer_length = reader.read(11) + if audio_object_type in (17, 19, 20, 23): + aac_section_data_resilience_flags = reader.read(1) + aac_scale_factor_data_resilience_flags = reader.read(1) + aac_spectral_data_resilience_flags = reader.read(1) + extension_flag_3 = reader.read(1) + if extension_flag_3 == 1: + raise ValueError('extensionFlag3 == 1 not supported') + + @staticmethod + def audio_object_type(reader: BitReader): + # GetAudioObjectType - ISO/EIC 14496-3 Table 1.16 + audio_object_type = reader.read(5) + if audio_object_type == 31: + audio_object_type = 32 + reader.read(6) + + return audio_object_type + + @dataclass + class AudioSpecificConfig: + audio_object_type: int + sampling_frequency_index: int + sampling_frequency: int + channel_configuration: int + sbr_present_flag: int + ps_present_flag: int + extension_audio_object_type: int + extension_sampling_frequency_index: int + extension_sampling_frequency: int + extension_channel_configuration: int + + SAMPLING_FREQUENCIES = [ + 96000, + 88200, + 64000, + 48000, + 44100, + 32000, + 24000, + 22050, + 16000, + 12000, + 11025, + 8000, + 7350, + ] + + def __init__(self, reader: BitReader) -> None: + # AudioSpecificConfig - ISO/EIC 14496-3 Table 1.15 + self.audio_object_type = AacAudioRtpPacket.audio_object_type(reader) + self.sampling_frequency_index = reader.read(4) + if self.sampling_frequency_index == 0xF: + self.sampling_frequency = reader.read(24) + else: + self.sampling_frequency = self.SAMPLING_FREQUENCIES[ + self.sampling_frequency_index + ] + self.channel_configuration = reader.read(4) + self.sbr_present_flag = -1 + self.ps_present_flag = -1 + if self.audio_object_type in (5, 29): + self.extension_audio_object_type = 5 + self.sbc_present_flag = 1 + if self.audio_object_type == 29: + self.ps_present_flag = 1 + self.extension_sampling_frequency_index = reader.read(4) + if self.extension_sampling_frequency_index == 0xF: + self.extension_sampling_frequency = reader.read(24) + else: + self.extension_sampling_frequency = self.SAMPLING_FREQUENCIES[ + self.extension_sampling_frequency_index + ] + self.audio_object_type = AacAudioRtpPacket.audio_object_type(reader) + if self.audio_object_type == 22: + self.extension_channel_configuration = reader.read(4) + else: + self.extension_audio_object_type = 0 + + if self.audio_object_type in (1, 2, 3, 4, 6, 7, 17, 19, 20, 21, 22, 23): + ga_specific_config = AacAudioRtpPacket.GASpecificConfig( + reader, self.channel_configuration, self.audio_object_type + ) + else: + raise ValueError( + f'audioObjectType {self.audio_object_type} not supported' + ) + + # if self.extension_audio_object_type != 5 and bits_to_decode >= 16: + # sync_extension_type = reader.read(11) + # if sync_extension_type == 0x2B7: + # self.extension_audio_object_type = AacAudioRtpPacket.audio_object_type(reader) + # if self.extension_audio_object_type == 5: + # self.sbr_present_flag = reader.read(1) + # if self.sbr_present_flag: + # self.extension_sampling_frequency_index = reader.read(4) + # if self.extension_sampling_frequency_index == 0xF: + # self.extension_sampling_frequency = reader.read(24) + # else: + # self.extension_sampling_frequency = self.SAMPLING_FREQUENCIES[self.extension_sampling_frequency_index] + # if bits_to_decode >= 12: + # sync_extension_type = reader.read(11) + # if sync_extension_type == 0x548: + # self.ps_present_flag = reader.read(1) + # elif self.extension_audio_object_type == 22: + # self.sbr_present_flag = reader.read(1) + # if self.sbr_present_flag: + # self.extension_sampling_frequency_index = reader.read(4) + # if self.extension_sampling_frequency_index == 0xF: + # self.extension_sampling_frequency = reader.read(24) + # else: + # self.extension_sampling_frequency = self.SAMPLING_FREQUENCIES[self.extension_sampling_frequency_index] + # self.extension_channel_configuration = reader.read(4) + + @dataclass + class StreamMuxConfig: + other_data_present: int + other_data_len_bits: int + audio_specific_config: AacAudioRtpPacket.AudioSpecificConfig + + def __init__(self, reader: BitReader) -> None: + # StreamMuxConfig - ISO/EIC 14496-3 Table 1.42 + audio_mux_version = reader.read(1) + if audio_mux_version == 1: + audio_mux_version_a = reader.read(1) + else: + audio_mux_version_a = 0 + if audio_mux_version_a != 0: + raise ValueError('audioMuxVersionA != 0 not supported') + if audio_mux_version == 1: + tara_buffer_fullness = AacAudioRtpPacket.latm_value(reader) + stream_cnt = 0 + all_streams_same_time_framing = reader.read(1) + num_sub_frames = reader.read(6) + num_program = reader.read(4) + if num_program != 0: + raise ValueError('num_program != 0 not supported') + num_layer = reader.read(3) + if num_layer != 0: + raise ValueError('num_layer != 0 not supported') + if audio_mux_version == 0: + self.audio_specific_config = AacAudioRtpPacket.AudioSpecificConfig( + reader + ) + else: + asc_len = AacAudioRtpPacket.latm_value(reader) + marker = reader.bit_position + self.audio_specific_config = AacAudioRtpPacket.AudioSpecificConfig( + reader + ) + audio_specific_config_len = reader.bit_position - marker + if asc_len < audio_specific_config_len: + raise ValueError('audio_specific_config_len > asc_len') + asc_len -= audio_specific_config_len + reader.skip(asc_len) + frame_length_type = reader.read(3) + if frame_length_type == 0: + latm_buffer_fullness = reader.read(8) + elif frame_length_type == 1: + frame_length = reader.read(9) + else: + raise ValueError(f'frame_length_type {frame_length_type} not supported') + + self.other_data_present = reader.read(1) + if self.other_data_present: + if audio_mux_version == 1: + self.other_data_len_bits = AacAudioRtpPacket.latm_value(reader) + else: + self.other_data_len_bits = 0 + while True: + self.other_data_len_bits *= 256 + other_data_len_esc = reader.read(1) + self.other_data_len_bits += reader.read(8) + if other_data_len_esc == 0: + break + crc_check_present = reader.read(1) + if crc_check_present: + crc_checksum = reader.read(8) + + @dataclass + class AudioMuxElement: + payload: bytes + stream_mux_config: AacAudioRtpPacket.StreamMuxConfig + + def __init__(self, reader: BitReader, mux_config_present: int): + if mux_config_present == 0: + raise ValueError('muxConfigPresent == 0 not supported') + + # AudioMuxElement - ISO/EIC 14496-3 Table 1.41 + use_same_stream_mux = reader.read(1) + if use_same_stream_mux: + raise ValueError('useSameStreamMux == 1 not supported') + self.stream_mux_config = AacAudioRtpPacket.StreamMuxConfig(reader) + + # We only support: + # allStreamsSameTimeFraming == 1 + # audioMuxVersionA == 0, + # numProgram == 0 + # numSubFrames == 0 + # numLayer == 0 + + mux_slot_length_bytes = 0 + while True: + tmp = reader.read(8) + mux_slot_length_bytes += tmp + if tmp != 255: + break + + self.payload = reader.read_bytes(mux_slot_length_bytes) + + if self.stream_mux_config.other_data_present: + reader.skip(self.stream_mux_config.other_data_len_bits) + + # ByteAlign + while reader.bit_position % 8: + reader.read(1) + + def __init__(self, data: bytes) -> None: + # Parse the bit stream + reader = BitReader(data) + self.audio_mux_element = self.AudioMuxElement(reader, mux_config_present=1) + + def to_adts(self): + # pylint: disable=line-too-long + sampling_frequency_index = ( + self.audio_mux_element.stream_mux_config.audio_specific_config.sampling_frequency_index + ) + channel_configuration = ( + self.audio_mux_element.stream_mux_config.audio_specific_config.channel_configuration + ) + frame_size = len(self.audio_mux_element.payload) + return ( + bytes( + [ + 0xFF, + 0xF1, # 0xF9 (MPEG2) + 0x40 + | (sampling_frequency_index << 2) + | (channel_configuration >> 2), + ((channel_configuration & 0x3) << 6) | ((frame_size + 7) >> 11), + ((frame_size + 7) >> 3) & 0xFF, + (((frame_size + 7) << 5) & 0xFF) | 0x1F, + 0xFC, + ] + ) + + self.audio_mux_element.payload + ) diff --git a/setup.cfg b/setup.cfg index 60aca27d..52c08185 100644 --- a/setup.cfg +++ b/setup.cfg @@ -30,21 +30,22 @@ package_dir = bumble.apps = apps include-package-data = True install_requires = + aiohttp >= 22.1.0; platform_system!='Emscripten' appdirs >= 1.4 click >= 7.1.2; platform_system!='Emscripten' cryptography == 35; platform_system!='Emscripten' grpcio >= 1.46; platform_system!='Emscripten' + humanize >= 4.6.0 libusb1 >= 2.0.1; platform_system!='Emscripten' libusb-package == 1.0.26.1; platform_system!='Emscripten' prompt_toolkit >= 3.0.16; platform_system!='Emscripten' + prettytable >= 3.6.0 protobuf >= 3.12.4 pyee >= 8.2.2 pyserial-asyncio >= 0.5; platform_system!='Emscripten' pyserial >= 3.5; platform_system!='Emscripten' pyusb >= 1.2; platform_system!='Emscripten' websockets >= 8.1; platform_system!='Emscripten' - prettytable >= 3.6.0 - humanize >= 4.6.0 [options.entry_points] console_scripts = diff --git a/speaker.html b/speaker.html new file mode 100644 index 00000000..05cc31f8 --- /dev/null +++ b/speaker.html @@ -0,0 +1,28 @@ + + + + Audio WAV Player + + +

Audio WAV Player

+ + + + + diff --git a/tests/codecs_test.py b/tests/codecs_test.py new file mode 100644 index 00000000..faf9df63 --- /dev/null +++ b/tests/codecs_test.py @@ -0,0 +1,64 @@ +# Copyright 2021-2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +import pytest +from bumble.codecs import AacAudioRtpPacket, BitReader + + +# ----------------------------------------------------------------------------- +def test_reader(): + reader = BitReader(b'') + with pytest.raises(ValueError): + reader.read(1) + + reader = BitReader(b'hello') + with pytest.raises(ValueError): + reader.read(40) + + reader = BitReader(bytes([0xFF])) + assert reader.read(1) == 1 + with pytest.raises(ValueError): + reader.read(10) + + reader = BitReader(bytes([0x78])) + value = 0 + for _ in range(8): + value = (value << 1) | reader.read(1) + assert value == 0x78 + + + data = bytes([x & 0xFF for x in range(66 * 100)]) + reader = BitReader(data) + value = 0 + for _ in range(100): + for bits in range(1, 33): + value = value << bits | reader.read(bits) + assert value == int.from_bytes(data, byteorder='big') + + +def test_aac_rtp(): + # pylint: disable=line-too-long + packet_data = bytes.fromhex('47fc0000b090800300202066000198000de120000000000000000000000000000000000000000000001c') + packet = AacAudioRtpPacket(packet_data) + adts = packet.to_adts() + assert adts == bytes.fromhex('fff1508004fffc2066000198000de120000000000000000000000000000000000000000000001c') + + +# ----------------------------------------------------------------------------- +if __name__ == '__main__': + test_reader() + test_aac_rtp()