feature/precode #2
@@ -38,7 +38,6 @@ class Multicaster:
|
||||
'is_streaming': streaming,
|
||||
}
|
||||
|
||||
|
||||
async def init_broadcast(self):
|
||||
self.device_acm = multicast.create_device(self.global_conf)
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import requests
|
||||
from auracast.utils.read_lc3_file import read_lc3_file
|
||||
|
||||
BASE_URL = "http://127.0.0.1:5000" # Adjust based on your actual API URL
|
||||
|
||||
@@ -15,11 +16,12 @@ def stop_audio():
|
||||
return response.json()
|
||||
|
||||
def send_audio():
|
||||
audio_data = {
|
||||
"broadcast_de": "test_audio_data_de",
|
||||
"broadcast_fr": "test_audio_data_fr"
|
||||
test_audio_data = { # TODO: investigate further whats the best way to actually transfer the data
|
||||
"broadcast_de": read_lc3_file('src/auracast/testdata/announcement_de_10_16_32.lc3').decode('latin-1'),
|
||||
"broadcast_en": read_lc3_file('src/auracast/testdata/announcement_en_10_16_32.lc3').decode('latin-1')
|
||||
}
|
||||
response = requests.post(f"{BASE_URL}/stream_lc3", json=audio_data)
|
||||
|
||||
response = requests.post(f"{BASE_URL}/stream_lc3", json=test_audio_data)
|
||||
return response.json()
|
||||
|
||||
def get_status():
|
||||
@@ -27,8 +29,8 @@ def get_status():
|
||||
return response.json()
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("Getting status:", get_status())
|
||||
print("Initializing server:", initialize())
|
||||
print("Getting status:", get_status())
|
||||
print("Sending audio:", send_audio())
|
||||
print("Getting status:", get_status())
|
||||
print("Stopping audio:", stop_audio())
|
||||
print("Shutting down:", shutdown())
|
||||
@@ -69,7 +69,7 @@ async def send_audio():
|
||||
try:
|
||||
for key, val in big_conf.items():
|
||||
if key in post_data:
|
||||
val.audio_source = post_data['key']
|
||||
val.audio_source = post_data[key].encode('latin-1')
|
||||
else:
|
||||
val.audio_source = b''
|
||||
|
||||
@@ -78,6 +78,7 @@ async def send_audio():
|
||||
return jsonify({"status": "audio_sent"}), 200
|
||||
except Exception as e:
|
||||
return jsonify({"error": str(e)}), 500
|
||||
|
||||
# TODO: Also a queue should be implemented - probably as its own endpoint,
|
||||
|
||||
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
import pytest
|
||||
import pytest_asyncio
|
||||
import asyncio
|
||||
from quart import jsonify
|
||||
from auracast.utils.read_lc3_file import read_lc3_file
|
||||
from auracast import multicast_server
|
||||
|
||||
@pytest.fixture
|
||||
@@ -8,10 +10,8 @@ def client():
|
||||
"""Fixture to create and return a Quart test client."""
|
||||
yield multicast_server.app.test_client()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_initialize(client):
|
||||
"""Tests the /init endpoint."""
|
||||
#client = multicast_server.app.test_client()
|
||||
@pytest_asyncio.fixture
|
||||
async def init_broadcast(client):
|
||||
response = await client.post('/init')
|
||||
json_data = await response.get_json()
|
||||
|
||||
@@ -20,8 +20,9 @@ async def test_initialize(client):
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_shutdown(client):
|
||||
async def test_shutdown(client, init_broadcast):
|
||||
"""Tests the /shutdown endpoint."""
|
||||
|
||||
response = await client.post('/shutdown')
|
||||
json_data = await response.get_json()
|
||||
|
||||
@@ -30,8 +31,9 @@ async def test_shutdown(client):
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stop_audio(client):
|
||||
async def test_stop_audio(client, init_broadcast):
|
||||
"""Tests the /stop_audio endpoint."""
|
||||
|
||||
response = await client.post('/stop_audio')
|
||||
json_data = await response.get_json()
|
||||
|
||||
@@ -40,11 +42,11 @@ async def test_stop_audio(client):
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_audio(client):
|
||||
async def test_send_audio(client, init_broadcast):
|
||||
"""Tests the /stream_lc3 endpoint."""
|
||||
test_audio_data = {
|
||||
"broadcast_de": read_lc3_file(big.audio_source),
|
||||
"broadcast_fr": b"test_audio_data_fr"
|
||||
test_audio_data = { # TODO: investigate further whats the best way to actually transfer the data
|
||||
"broadcast_de": read_lc3_file('src/auracast/testdata/announcement_de_10_16_32.lc3').decode('latin-1'),
|
||||
"broadcast_en": read_lc3_file('src/auracast/testdata/announcement_en_10_16_32.lc3').decode('latin-1')
|
||||
}
|
||||
|
||||
response = await client.post('/stream_lc3', json=test_audio_data)
|
||||
@@ -53,10 +55,11 @@ async def test_send_audio(client):
|
||||
assert response.status_code == 200
|
||||
assert json_data["status"] == "audio_sent"
|
||||
|
||||
await asyncio.wait([multicast_server.multicaster.streamer.task], timeout=10)
|
||||
# Ensure the audio data is correctly assigned
|
||||
for key, val in multicast_server.big_conf.items():
|
||||
if key in test_audio_data:
|
||||
assert val.audio_source == test_audio_data[key]
|
||||
assert val.audio_source == test_audio_data[key].encode('latin-1')
|
||||
else:
|
||||
assert val.audio_source == b""
|
||||
|
||||
|
||||
Reference in New Issue
Block a user