forked from auracaster/bumble_mirror
Squashed commits: [90f2024] fix import order [0edd321] add a few docstrings [77a0ac0] wip [adcf159] wip [96cbd67] wip [d8bfbab] wip (+1 squashed commit) Squashed commits: [43b4d66] wip (+2 squashed commits) Squashed commits: [3dafaa8] wip [5844026] wip (+1 squashed commit) Squashed commits: [4cbb35a] wip (+1 squashed commit) Squashed commits: [4d2b6d3] wip (+4 squashed commits) Squashed commits: [f2da510] wip [318c119] wip [923b4eb] wip [9d46365] wip use a dict instead of a series of ifs (+6 squashed commits) Squashed commits: [90f2024] fix import order [0edd321] add a few docstrings [77a0ac0] wip [adcf159] wip [96cbd67] wip [d8bfbab] wip
247 lines
8.5 KiB
Python
247 lines
8.5 KiB
Python
# Copyright 2023 Google LLC
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# https://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Imports
|
|
# -----------------------------------------------------------------------------
|
|
import asyncio
|
|
import struct
|
|
|
|
import pytest
|
|
|
|
from bumble import core
|
|
from bumble import device
|
|
from bumble import host
|
|
from bumble import controller
|
|
from bumble import link
|
|
from bumble import avc
|
|
from bumble import avrcp
|
|
from bumble import avctp
|
|
from bumble.transport import common
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
class TwoDevices:
|
|
def __init__(self):
|
|
self.connections = [None, None]
|
|
|
|
addresses = ['F0:F1:F2:F3:F4:F5', 'F5:F4:F3:F2:F1:F0']
|
|
self.link = link.LocalLink()
|
|
self.controllers = [
|
|
controller.Controller('C1', link=self.link, public_address=addresses[0]),
|
|
controller.Controller('C2', link=self.link, public_address=addresses[1]),
|
|
]
|
|
self.devices = [
|
|
device.Device(
|
|
address=addresses[0],
|
|
host=host.Host(
|
|
self.controllers[0], common.AsyncPipeSink(self.controllers[0])
|
|
),
|
|
),
|
|
device.Device(
|
|
address=addresses[1],
|
|
host=host.Host(
|
|
self.controllers[1], common.AsyncPipeSink(self.controllers[1])
|
|
),
|
|
),
|
|
]
|
|
self.devices[0].classic_enabled = True
|
|
self.devices[1].classic_enabled = True
|
|
self.connections = [None, None]
|
|
self.protocols = [None, None]
|
|
|
|
def on_connection(self, which, connection):
|
|
self.connections[which] = connection
|
|
|
|
async def setup_connections(self):
|
|
await self.devices[0].power_on()
|
|
await self.devices[1].power_on()
|
|
|
|
self.connections = await asyncio.gather(
|
|
self.devices[0].connect(
|
|
self.devices[1].public_address, core.BT_BR_EDR_TRANSPORT
|
|
),
|
|
self.devices[1].accept(self.devices[0].public_address),
|
|
)
|
|
|
|
self.protocols = [avrcp.Protocol(), avrcp.Protocol()]
|
|
self.protocols[0].listen(self.devices[1])
|
|
await self.protocols[1].connect(self.connections[0])
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
def test_frame_parser():
|
|
with pytest.raises(ValueError) as error:
|
|
avc.Frame.from_bytes(bytes.fromhex("11480000"))
|
|
|
|
x = bytes.fromhex("014D0208")
|
|
frame = avc.Frame.from_bytes(x)
|
|
assert frame.subunit_type == avc.Frame.SubunitType.PANEL
|
|
assert frame.subunit_id == 7
|
|
assert frame.opcode == 8
|
|
|
|
x = bytes.fromhex("014DFF0108")
|
|
frame = avc.Frame.from_bytes(x)
|
|
assert frame.subunit_type == avc.Frame.SubunitType.PANEL
|
|
assert frame.subunit_id == 260
|
|
assert frame.opcode == 8
|
|
|
|
x = bytes.fromhex("0148000019581000000103")
|
|
|
|
frame = avc.Frame.from_bytes(x)
|
|
|
|
assert isinstance(frame, avc.CommandFrame)
|
|
assert frame.ctype == avc.CommandFrame.CommandType.STATUS
|
|
assert frame.subunit_type == avc.Frame.SubunitType.PANEL
|
|
assert frame.subunit_id == 0
|
|
assert frame.opcode == 0
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
def test_vendor_dependent_command():
|
|
x = bytes.fromhex("0148000019581000000103")
|
|
frame = avc.Frame.from_bytes(x)
|
|
assert isinstance(frame, avc.VendorDependentCommandFrame)
|
|
assert frame.company_id == 0x1958
|
|
assert frame.vendor_dependent_data == bytes.fromhex("1000000103")
|
|
|
|
frame = avc.VendorDependentCommandFrame(
|
|
avc.CommandFrame.CommandType.STATUS,
|
|
avc.Frame.SubunitType.PANEL,
|
|
0,
|
|
0x1958,
|
|
bytes.fromhex("1000000103"),
|
|
)
|
|
assert bytes(frame) == x
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
def test_avctp_message_assembler():
|
|
received_message = []
|
|
|
|
def on_message(transaction_label, is_response, ipid, pid, payload):
|
|
received_message.append((transaction_label, is_response, ipid, pid, payload))
|
|
|
|
assembler = avctp.MessageAssembler(on_message)
|
|
|
|
payload = bytes.fromhex("01")
|
|
assembler.on_pdu(bytes([1 << 4 | 0b00 << 2 | 1 << 1 | 0, 0x11, 0x22]) + payload)
|
|
assert received_message
|
|
assert received_message[0] == (1, False, False, 0x1122, payload)
|
|
|
|
received_message = []
|
|
payload = bytes.fromhex("010203")
|
|
assembler.on_pdu(bytes([1 << 4 | 0b01 << 2 | 1 << 1 | 0, 0x11, 0x22]) + payload)
|
|
assert len(received_message) == 0
|
|
assembler.on_pdu(bytes([1 << 4 | 0b00 << 2 | 1 << 1 | 0, 0x11, 0x22]) + payload)
|
|
assert received_message
|
|
assert received_message[0] == (1, False, False, 0x1122, payload)
|
|
|
|
received_message = []
|
|
payload = bytes.fromhex("010203")
|
|
assembler.on_pdu(
|
|
bytes([1 << 4 | 0b01 << 2 | 1 << 1 | 0, 3, 0x11, 0x22]) + payload[0:1]
|
|
)
|
|
assembler.on_pdu(
|
|
bytes([1 << 4 | 0b10 << 2 | 1 << 1 | 0, 0x11, 0x22]) + payload[1:2]
|
|
)
|
|
assembler.on_pdu(
|
|
bytes([1 << 4 | 0b11 << 2 | 1 << 1 | 0, 0x11, 0x22]) + payload[2:3]
|
|
)
|
|
assert received_message
|
|
assert received_message[0] == (1, False, False, 0x1122, payload)
|
|
|
|
# received_message = []
|
|
# parameter = bytes.fromhex("010203")
|
|
# assembler.on_pdu(struct.pack(">BBH", 0x10, 0b11, len(parameter)) + parameter)
|
|
# assert len(received_message) == 0
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
def test_avrcp_pdu_assembler():
|
|
received_pdus = []
|
|
|
|
def on_pdu(pdu_id, parameter):
|
|
received_pdus.append((pdu_id, parameter))
|
|
|
|
assembler = avrcp.PduAssembler(on_pdu)
|
|
|
|
parameter = bytes.fromhex("01")
|
|
assembler.on_pdu(struct.pack(">BBH", 0x10, 0b00, len(parameter)) + parameter)
|
|
assert received_pdus
|
|
assert received_pdus[0] == (0x10, parameter)
|
|
|
|
received_pdus = []
|
|
parameter = bytes.fromhex("010203")
|
|
assembler.on_pdu(struct.pack(">BBH", 0x10, 0b01, len(parameter)) + parameter)
|
|
assert len(received_pdus) == 0
|
|
assembler.on_pdu(struct.pack(">BBH", 0x10, 0b00, len(parameter)) + parameter)
|
|
assert received_pdus
|
|
assert received_pdus[0] == (0x10, parameter)
|
|
|
|
received_pdus = []
|
|
parameter = bytes.fromhex("010203")
|
|
assembler.on_pdu(struct.pack(">BBH", 0x10, 0b01, 1) + parameter[0:1])
|
|
assembler.on_pdu(struct.pack(">BBH", 0x10, 0b10, 1) + parameter[1:2])
|
|
assembler.on_pdu(struct.pack(">BBH", 0x10, 0b11, 1) + parameter[2:3])
|
|
assert received_pdus
|
|
assert received_pdus[0] == (0x10, parameter)
|
|
|
|
received_pdus = []
|
|
parameter = bytes.fromhex("010203")
|
|
assembler.on_pdu(struct.pack(">BBH", 0x10, 0b11, len(parameter)) + parameter)
|
|
assert len(received_pdus) == 0
|
|
|
|
|
|
def test_passthrough_commands():
|
|
play_pressed = avc.PassThroughCommandFrame(
|
|
avc.CommandFrame.CommandType.CONTROL,
|
|
avc.CommandFrame.SubunitType.PANEL,
|
|
0,
|
|
avc.PassThroughCommandFrame.StateFlag.PRESSED,
|
|
avc.PassThroughCommandFrame.OperationId.PLAY,
|
|
b'',
|
|
)
|
|
|
|
play_pressed_bytes = bytes(play_pressed)
|
|
parsed = avc.Frame.from_bytes(play_pressed_bytes)
|
|
assert isinstance(parsed, avc.PassThroughCommandFrame)
|
|
assert parsed.operation_id == avc.PassThroughCommandFrame.OperationId.PLAY
|
|
assert bytes(parsed) == play_pressed_bytes
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
@pytest.mark.asyncio
|
|
async def test_get_supported_events():
|
|
two_devices = TwoDevices()
|
|
await two_devices.setup_connections()
|
|
|
|
supported_events = await two_devices.protocols[0].get_supported_events()
|
|
assert supported_events == []
|
|
|
|
delegate1 = avrcp.Delegate([avrcp.EventId.VOLUME_CHANGED])
|
|
two_devices.protocols[0].delegate = delegate1
|
|
supported_events = await two_devices.protocols[1].get_supported_events()
|
|
assert supported_events == [avrcp.EventId.VOLUME_CHANGED]
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
if __name__ == '__main__':
|
|
test_frame_parser()
|
|
test_vendor_dependent_command()
|
|
test_avctp_message_assembler()
|
|
test_avrcp_pdu_assembler()
|
|
test_passthrough_commands()
|
|
test_get_supported_events()
|