mirror of
https://gitlab.freedesktop.org/pipewire/pipewire.git
synced 2026-05-13 23:50:29 -04:00
test: add VM-based full-stack bluetooth tests
Add tests that check PipeWire <-> PipeWire bluetooth audio streaming for A2DP, BAP, HFP. The tests use Qemu VMs and don't require Bluetooth support from HW / kernel. Full VM images are not required; similarly to BlueZ kernel tester these use (read-only) mount of host filesystem. A monolithic kernel image with suitable config is required. The bluetoothd binary installed on host is used if found; otherwise tests are skipped. These test depend on https://github.com/pv/pytest-bluezenv which manages the VM setup. To launch: python3 -m pip install pytest-bluezenv meson devenv -C builddir -w . python3 -m pytest test --kernel-build -v which also builds a kernel image with required options.
This commit is contained in:
parent
708cb6e2e8
commit
d12367e10e
4 changed files with 680 additions and 0 deletions
0
test/bluezenv/__init__.py
Normal file
0
test/bluezenv/__init__.py
Normal file
95
test/bluezenv/conftest.py
Normal file
95
test/bluezenv/conftest.py
Normal file
|
|
@ -0,0 +1,95 @@
|
|||
# -*- coding: utf-8; mode: python; eval: (blacken-mode); -*-
|
||||
# SPDX-FileCopyrightText: Copyright © 2026 Pauli Virtanen
|
||||
# SPDX-License-Identifier: MIT
|
||||
|
||||
import pytest
|
||||
import warnings
|
||||
|
||||
from pytest_bluezenv import Bluetoothd
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def paired_hosts(hosts, host_setup):
|
||||
"""
|
||||
Provide two hosts, paired with each other, and the first one Central
|
||||
"""
|
||||
|
||||
le = any(
|
||||
"ControllerMode = le" in (p.conf or "")
|
||||
for plugins in host_setup["setup"]
|
||||
for p in plugins
|
||||
if isinstance(p, Bluetoothd)
|
||||
)
|
||||
|
||||
if le:
|
||||
yield from _pair_le(hosts)
|
||||
else:
|
||||
yield from _pair_bredr(hosts)
|
||||
|
||||
|
||||
def _pair_bredr(hosts):
|
||||
host0, host1 = hosts
|
||||
|
||||
host0.bluetoothctl.send("scan on\n")
|
||||
host0.bluetoothctl.expect(f"Controller {host0.bdaddr.upper()} Discovering: yes")
|
||||
|
||||
host1.bluetoothctl.send("pairable on\n")
|
||||
host1.bluetoothctl.expect("Changing pairable on succeeded")
|
||||
host1.bluetoothctl.send("discoverable on\n")
|
||||
host1.bluetoothctl.expect(f"Controller {host1.bdaddr.upper()} Discoverable: yes")
|
||||
|
||||
host0.bluetoothctl.expect(f"Device {host1.bdaddr.upper()}")
|
||||
host0.bluetoothctl.send(f"pair {host1.bdaddr}\n")
|
||||
|
||||
idx, m = host0.bluetoothctl.expect(r"Confirm passkey (\d+).*:")
|
||||
key = m[0].decode("utf-8")
|
||||
|
||||
host1.bluetoothctl.expect(f"Confirm passkey {key}")
|
||||
|
||||
host0.bluetoothctl.send("yes\n")
|
||||
host1.bluetoothctl.send("yes\n")
|
||||
|
||||
host0.bluetoothctl.expect("Pairing successful")
|
||||
|
||||
yield hosts
|
||||
|
||||
|
||||
def _pair_le(hosts):
|
||||
host0, host1 = hosts
|
||||
|
||||
host0.bluetoothctl.send("scan on\n")
|
||||
host0.bluetoothctl.expect(f"Controller {host0.bdaddr.upper()} Discovering: yes")
|
||||
|
||||
host1.bluetoothctl.send("advertise on\n")
|
||||
host1.bluetoothctl.expect("Advertising object registered")
|
||||
|
||||
host0.bluetoothctl.expect(f"Device {host1.bdaddr.upper()}")
|
||||
host0.bluetoothctl.send(f"pair {host1.bdaddr.upper()}\n")
|
||||
|
||||
# BUG!: if controller is power cycled off/on at boot (before bluetoothd)
|
||||
# BUG!: which is what the tester here does,
|
||||
# BUG!: bluetoothd MGMT command to enable Secure Connections Host Support
|
||||
# BUG!: fails and we are left with legacy passkey. It seems we get randomly
|
||||
# BUG!: one of these depending on what state controller/kernel were before
|
||||
# BUG!: btmgmt power off/on
|
||||
|
||||
idx, m = host0.bluetoothctl.expect(
|
||||
[r"\[agent\].*Passkey:.*m(\d+)", r"Confirm passkey (\d+).*:"]
|
||||
)
|
||||
key = m[0].decode("utf-8")
|
||||
|
||||
if idx == 0:
|
||||
warnings.warn(
|
||||
"BUG: we got passkey authentication, bluetoothd/kernel should be fixed"
|
||||
)
|
||||
host1.bluetoothctl.expect(r"\[agent\] Enter passkey \(number in 0-999999\):")
|
||||
host1.bluetoothctl.send(f"{key}\n")
|
||||
else:
|
||||
host1.bluetoothctl.expect(f"Confirm passkey {key}")
|
||||
|
||||
host0.bluetoothctl.send("yes\n")
|
||||
host1.bluetoothctl.send("yes\n")
|
||||
|
||||
host0.bluetoothctl.expect("Pairing successful")
|
||||
|
||||
yield hosts
|
||||
554
test/bluezenv/test_streaming.py
Normal file
554
test/bluezenv/test_streaming.py
Normal file
|
|
@ -0,0 +1,554 @@
|
|||
# -*- coding: utf-8; mode: python; eval: (blacken-mode); -*-
|
||||
# SPDX-FileCopyrightText: Copyright © 2026 Pauli Virtanen
|
||||
# SPDX-License-Identifier: MIT
|
||||
"""
|
||||
Tests for PipeWire audio streaming
|
||||
|
||||
To use uninstalled version of PipeWire, run the tests in PipeWire
|
||||
devenv::
|
||||
|
||||
meson devenv -C ../pipewire/builddir -w . python3 -mpytest test/bluezenv -v
|
||||
"""
|
||||
import sys
|
||||
import os
|
||||
import re
|
||||
import pytest
|
||||
import subprocess
|
||||
import tempfile
|
||||
import time
|
||||
import logging
|
||||
import json
|
||||
import dbus
|
||||
import threading
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from pytest_bluezenv import (
|
||||
HostPlugin,
|
||||
host_config,
|
||||
find_exe,
|
||||
Bluetoothd,
|
||||
Bluetoothctl,
|
||||
DbusSession,
|
||||
LogStream,
|
||||
wait_until,
|
||||
mainloop_wrap,
|
||||
)
|
||||
|
||||
pytestmark = [pytest.mark.vm]
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
# Use larger VM instances in case ASAN is enabled
|
||||
VM_MEM = "512M"
|
||||
|
||||
|
||||
class PipeWire(HostPlugin):
|
||||
"""
|
||||
Launch PipeWire in VM instance
|
||||
"""
|
||||
|
||||
name = "pipewire"
|
||||
depends = [DbusSession(), Bluetoothd()]
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
uuids=(
|
||||
"0000110a-0000-1000-8000-00805f9b34fb",
|
||||
"0000110b-0000-1000-8000-00805f9b34fb",
|
||||
),
|
||||
roles="a2dp_sink a2dp_source",
|
||||
config=None,
|
||||
):
|
||||
self.uuids = tuple(uuids)
|
||||
self.roles = str(roles)
|
||||
self.config = config
|
||||
|
||||
# For running PipeWire from build directory
|
||||
self.devenv = {}
|
||||
if os.environ.get("PW_UNINSTALLED"):
|
||||
devenv_keys = [
|
||||
"WIREPLUMBER_MODULE_DIR",
|
||||
"WIREPLUMBER_CONFIG_DIR",
|
||||
"WIREPLUMBER_DATA_DIR",
|
||||
"PIPEWIRE_CONFIG_DIR",
|
||||
"PIPEWIRE_MODULE_DIR",
|
||||
"SPA_PLUGIN_DIR",
|
||||
"SPA_DATA_DIR",
|
||||
"ACP_PATHS_DIR",
|
||||
"ACP_PROFILES_DIR",
|
||||
"GST_PLUGIN_PATH",
|
||||
"ALSA_PLUGIN_DIR",
|
||||
"LD_LIBRARY_PATH",
|
||||
"PW_UNINSTALLED",
|
||||
"PW_BUILDDIR",
|
||||
"PATH",
|
||||
]
|
||||
for key in devenv_keys:
|
||||
value = os.environ.get(key)
|
||||
if value is not None:
|
||||
self.devenv[key] = value
|
||||
|
||||
def presetup(self, config):
|
||||
try:
|
||||
self.exe_pw = find_exe("", "pipewire")
|
||||
self.exe_wp = find_exe("", "wireplumber")
|
||||
self.exe_dump = find_exe("", "pw-dump")
|
||||
self.exe_play = find_exe("", "pw-play")
|
||||
self.exe_record = find_exe("", "pw-record")
|
||||
|
||||
# get versions
|
||||
res = subprocess.run(
|
||||
[self.exe_pw, "--version"], stdout=subprocess.PIPE, encoding="utf-8"
|
||||
)
|
||||
m = re.search("libpipewire ([0-9.]+)", res.stdout)
|
||||
if m:
|
||||
pw_version = tuple(int(x) for x in m.group(1).split("."))
|
||||
else:
|
||||
raise ValueError("pipewire version unknown")
|
||||
|
||||
res = subprocess.run(
|
||||
[self.exe_wp, "--version"], stdout=subprocess.PIPE, encoding="utf-8"
|
||||
)
|
||||
m = re.search("libwireplumber ([0-9.]+)", res.stdout)
|
||||
if m:
|
||||
wp_version = tuple(int(x) for x in m.group(1).split("."))
|
||||
else:
|
||||
raise ValueError("wireplumber version unknown")
|
||||
|
||||
# check versions
|
||||
if pw_version >= (1, 6, 0) and pw_version <= (1, 6, 2):
|
||||
raise ValueError("buggy pipewire version")
|
||||
if pw_version < (1, 4, 9):
|
||||
raise ValueError("pipewire too old")
|
||||
if wp_version < (0, 5, 8):
|
||||
raise ValueError("wireplumber too old")
|
||||
except (FileNotFoundError, ValueError) as exc:
|
||||
pytest.skip(reason=f"PipeWire: {exc}")
|
||||
|
||||
@mainloop_wrap
|
||||
def setup(self, impl):
|
||||
self.play = None
|
||||
self.record = None
|
||||
self.log = logging.getLogger(self.name)
|
||||
|
||||
self.tmpdir = tempfile.TemporaryDirectory(prefix="pipewire-", dir="/run")
|
||||
conf_dir = Path(self.tmpdir.name) / "config"
|
||||
runtime_dir = Path(self.tmpdir.name) / "runtime"
|
||||
state_dir = Path(self.tmpdir.name) / "state"
|
||||
|
||||
dropin_dir = conf_dir / "wireplumber" / "wireplumber.conf.d"
|
||||
wp_conf = dropin_dir / "01-config.conf"
|
||||
wp_extra_conf = dropin_dir / "02-extra-config.conf"
|
||||
|
||||
conf_dir.mkdir()
|
||||
runtime_dir.mkdir()
|
||||
dropin_dir.mkdir(parents=True)
|
||||
state_dir.mkdir()
|
||||
|
||||
self.environ = environ = dict(os.environ)
|
||||
environ.update(self.devenv)
|
||||
|
||||
environ["XDG_CONFIG_HOME"] = str(conf_dir)
|
||||
environ["XDG_STATE_HOME"] = str(runtime_dir)
|
||||
environ["XDG_RUNTIME_HOME"] = str(runtime_dir)
|
||||
environ["PIPEWIRE_RUNTIME_DIR"] = str(runtime_dir)
|
||||
environ["XDG_STATE_HOME"] = str(state_dir)
|
||||
environ["PIPEWIRE_DEBUG"] = "2"
|
||||
environ["WIREPLUMBER_DEBUG"] = (
|
||||
"spa.bluez5.iso:3,spa.bluez5*:4,s-monitors:4,m-lua-scripting:4,s-linking:4,s-device:4"
|
||||
)
|
||||
|
||||
# Handle devenv
|
||||
if "WIREPLUMBER_CONFIG_DIR" in environ:
|
||||
environ["WIREPLUMBER_CONFIG_DIR"] = (
|
||||
environ["WIREPLUMBER_CONFIG_DIR"] + ":" + str(conf_dir / "wireplumber")
|
||||
)
|
||||
|
||||
with open(wp_conf, "w") as f:
|
||||
text = f"""
|
||||
monitor.bluez.properties = {{
|
||||
bluez5.roles = [ {self.roles} ]
|
||||
bluez5.decode-buffer.latency = 4096
|
||||
}}
|
||||
"""
|
||||
f.write(text)
|
||||
|
||||
if self.config is not None:
|
||||
with open(wp_extra_conf, "w") as f:
|
||||
f.write(self.config)
|
||||
|
||||
log.info(f"Starting pipewire: {self.exe_pw}")
|
||||
|
||||
self.logger = LogStream("pipewire")
|
||||
self.pw = subprocess.Popen(
|
||||
self.exe_pw,
|
||||
env=environ,
|
||||
stdout=self.logger.stream,
|
||||
stderr=subprocess.STDOUT,
|
||||
)
|
||||
|
||||
log.info(f"Starting wireplumber: {self.exe_wp}")
|
||||
|
||||
self.wp = subprocess.Popen(
|
||||
self.exe_wp,
|
||||
env=environ,
|
||||
stdout=self.logger.stream,
|
||||
stderr=subprocess.STDOUT,
|
||||
)
|
||||
|
||||
# Wait for PipeWire's bluetooth services
|
||||
log.info("Wait for PipeWire...")
|
||||
bus = dbus.SystemBus()
|
||||
bus.set_exit_on_disconnect(False)
|
||||
adapter = dbus.Interface(
|
||||
bus.get_object("org.bluez", "/org/bluez/hci0"),
|
||||
"org.freedesktop.DBus.Properties",
|
||||
)
|
||||
|
||||
def cond():
|
||||
uuids = [str(uuid) for uuid in adapter.Get("org.bluez.Adapter1", "UUIDs")]
|
||||
return all(uuid in uuids for uuid in self.uuids)
|
||||
|
||||
wait_until(cond)
|
||||
|
||||
os.environ["PIPEWIRE_RUNTIME_DIR"] = str(runtime_dir)
|
||||
|
||||
# Wait for wireplumber session services
|
||||
text = None
|
||||
|
||||
def cond():
|
||||
nonlocal text
|
||||
|
||||
text = self.pw_dump()
|
||||
try:
|
||||
data = json.loads(text)
|
||||
except:
|
||||
return False
|
||||
for item in data:
|
||||
if item.get("type", None) != "PipeWire:Interface:Client":
|
||||
continue
|
||||
if item["info"]["props"]["application.name"] != "WirePlumber":
|
||||
continue
|
||||
if "api.bluez" in item["info"]["props"].get("session.services", ""):
|
||||
return True
|
||||
return False
|
||||
|
||||
try:
|
||||
wait_until(cond)
|
||||
except:
|
||||
raise TimeoutError(f"PipeWire not ready\n{text}")
|
||||
|
||||
log.info("PipeWire ready")
|
||||
|
||||
def pw_dump(self):
|
||||
ret = subprocess.run(
|
||||
[self.exe_dump], stdout=subprocess.PIPE, encoding="utf-8", env=self.environ
|
||||
)
|
||||
|
||||
return ret.stdout
|
||||
|
||||
def pw_play(self):
|
||||
self.play = subprocess.Popen(
|
||||
[
|
||||
self.exe_play,
|
||||
"--raw",
|
||||
"--rate",
|
||||
"4000",
|
||||
"--channels",
|
||||
"1",
|
||||
"--format",
|
||||
"s8",
|
||||
"-",
|
||||
],
|
||||
stdin=subprocess.PIPE,
|
||||
env=self.environ,
|
||||
)
|
||||
self.play_thread = threading.Thread(
|
||||
target=self._play_thread, args=(self.play.stdin,)
|
||||
)
|
||||
self.play_thread.start()
|
||||
|
||||
def _play_thread(self, stream):
|
||||
block = bytes([j % 256 for j in range(4096)])
|
||||
while True:
|
||||
try:
|
||||
stream.write(block)
|
||||
except:
|
||||
self.log.info("pw_play ended")
|
||||
break
|
||||
|
||||
def pw_record(self):
|
||||
self.record = subprocess.Popen(
|
||||
[
|
||||
self.exe_record,
|
||||
"-P",
|
||||
"media.class=Audio/Sink",
|
||||
"--raw",
|
||||
"--format",
|
||||
"s8",
|
||||
"--rate",
|
||||
"4000",
|
||||
"--channels",
|
||||
"1",
|
||||
"-",
|
||||
],
|
||||
stdout=subprocess.PIPE,
|
||||
env=self.environ,
|
||||
)
|
||||
self.record_thread = threading.Thread(
|
||||
target=self._record_thread, args=(self.record.stdout,)
|
||||
)
|
||||
self.record_thread.start()
|
||||
self.record_signal = threading.Event()
|
||||
|
||||
def _record_thread(self, stream):
|
||||
while True:
|
||||
try:
|
||||
block = stream.read(256)
|
||||
if not block:
|
||||
break
|
||||
except:
|
||||
self.log.info("pw_record failed")
|
||||
break
|
||||
|
||||
# If we get anything nonzero, some signal is getting
|
||||
# through. Can't check exactness due to encoding and
|
||||
# possibly heavy underruns in VM environment.
|
||||
if any(list(block)):
|
||||
self.log.info("pw_record signal found")
|
||||
self.record_success = True
|
||||
self.record_signal.set()
|
||||
return
|
||||
else:
|
||||
self.log.debug("pw_record: waiting for signal")
|
||||
|
||||
self.log.error("pw_record: no signal found")
|
||||
self.record_success = False
|
||||
self.record_signal.set()
|
||||
|
||||
def pw_record_wait_signal(self, timeout=160):
|
||||
res = self.record_signal.wait(timeout=timeout)
|
||||
return res and self.record_success
|
||||
|
||||
def teardown(self):
|
||||
log.info("Stop pipewire")
|
||||
self.pw.terminate()
|
||||
self.wp.terminate()
|
||||
if self.play is not None:
|
||||
try:
|
||||
self.play.stdin.close()
|
||||
except BrokenPipeError:
|
||||
pass
|
||||
self.play.terminate()
|
||||
self.play_thread.join()
|
||||
if self.record is not None:
|
||||
try:
|
||||
self.record.stdout.close()
|
||||
except BrokenPipeError:
|
||||
pass
|
||||
self.record.terminate()
|
||||
self.record_thread.join()
|
||||
self.tmpdir.cleanup()
|
||||
|
||||
|
||||
a2dp_host = [Bluetoothctl(), PipeWire(roles="a2dp_sink a2dp_source")]
|
||||
|
||||
|
||||
@host_config(a2dp_host, a2dp_host, mem=VM_MEM)
|
||||
def test_pipewire_a2dp(paired_hosts):
|
||||
host0, host1 = paired_hosts
|
||||
|
||||
# Connect
|
||||
host1.bluetoothctl.send(f"trust {host0.bdaddr}\n")
|
||||
host0.bluetoothctl.send(f"connect {host1.bdaddr}\n")
|
||||
|
||||
# Wait for pipewire devices to appear
|
||||
check_pipewire_devices_exist(host0, "a2dp-sink")
|
||||
|
||||
# Test streaming
|
||||
host1.pipewire.pw_record()
|
||||
host0.pipewire.pw_play()
|
||||
|
||||
assert host1.pipewire.pw_record_wait_signal()
|
||||
|
||||
|
||||
bap_ucast_host = [
|
||||
Bluetoothd(conf="[General]\nControllerMode = le\n", args=["-E", "-K"]),
|
||||
Bluetoothctl(),
|
||||
PipeWire(
|
||||
roles="bap_sink bap_source", uuids=("00001850-0000-1000-8000-00805f9b34fb",)
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
@host_config(bap_ucast_host, bap_ucast_host, mem=VM_MEM)
|
||||
def test_pipewire_bap_ucast(paired_hosts):
|
||||
host0, host1 = paired_hosts
|
||||
|
||||
# Connect
|
||||
host1.bluetoothctl.send(f"trust {host0.bdaddr}\n")
|
||||
|
||||
host0.bluetoothctl.send(f"scan off\n")
|
||||
host0.bluetoothctl.send(f"connect {host1.bdaddr}\n")
|
||||
|
||||
# Wait for pipewire devices to appear
|
||||
check_pipewire_devices_exist(host0, "bap-sink")
|
||||
|
||||
# Test streaming
|
||||
host1.pipewire.pw_record()
|
||||
host0.pipewire.pw_play()
|
||||
|
||||
assert host1.pipewire.pw_record_wait_signal()
|
||||
|
||||
|
||||
bcast_src_config = """
|
||||
monitor.bluez.properties = {
|
||||
bluez5.bcast_source.config = [
|
||||
{
|
||||
"broadcast_code": "Test",
|
||||
"encryption": false,
|
||||
"bis": [ { "qos_preset": "16_2_1" } ]
|
||||
}
|
||||
]
|
||||
}
|
||||
"""
|
||||
|
||||
bap_bcast_src_host = [
|
||||
Bluetoothd(conf="[General]\nControllerMode = le\n", args=["-E", "-K"]),
|
||||
Bluetoothctl(),
|
||||
PipeWire(
|
||||
roles="bap_bcast_source",
|
||||
uuids=("00001850-0000-1000-8000-00805f9b34fb",),
|
||||
config=bcast_src_config,
|
||||
),
|
||||
]
|
||||
|
||||
bap_bcast_snk_host = [
|
||||
Bluetoothd(conf="[General]\nControllerMode = le\n", args=["-E", "-K"]),
|
||||
Bluetoothctl(),
|
||||
PipeWire(roles="bap_bcast_sink", uuids=("00001850-0000-1000-8000-00805f9b34fb",)),
|
||||
]
|
||||
|
||||
|
||||
# BUG!: the bcast test is sometimes flaky because BlueZ has a hardcoded
|
||||
# BUG!: 3 sec DBus timeout and Wireplumber on the VM may not boot up
|
||||
# BUG!: fast enough
|
||||
|
||||
|
||||
@host_config(bap_bcast_src_host, bap_bcast_snk_host, mem=VM_MEM)
|
||||
def test_pipewire_bap_bcast(hosts):
|
||||
host0, host1 = hosts
|
||||
|
||||
# Start broadcasting
|
||||
check_pipewire_devices_exist(host0, "bap-sink")
|
||||
host0.pipewire.pw_play()
|
||||
|
||||
# Connect
|
||||
host1.bluetoothctl.send(f"scan on\n")
|
||||
|
||||
host0.bluetoothctl.send(f"advertise on\n")
|
||||
|
||||
host1.pipewire.pw_record()
|
||||
|
||||
idx, m = host1.bluetoothctl.expect(f"Transport (/org/bluez/hci0/.+)")
|
||||
transport = m[0].decode("utf-8")
|
||||
|
||||
# BUG!: issuing transport select immediately causes failure
|
||||
# BUG!: as it tries to enter broadcasting state while config(1)
|
||||
# BUG!: is not finished and BROADCASTING state gets cancelled via
|
||||
# BUG!: transport.c:bap_state_changed()
|
||||
# BUG!: -> transport_update_playing(transport, FALSE)
|
||||
# BUG!: -> transport_set_state(transport, TRANSPORT_STATE_IDLE)
|
||||
|
||||
# TODO: fix the bug and go to transport.select without waiting here
|
||||
check_pipewire_devices_exist(host1, "device")
|
||||
|
||||
host1.bluetoothctl.send(f"transport.select {transport}\n")
|
||||
|
||||
check_pipewire_devices_exist(host1, "bap-source")
|
||||
|
||||
# Test streaming
|
||||
assert host1.pipewire.pw_record_wait_signal()
|
||||
|
||||
|
||||
hfp_hf_host = [
|
||||
Bluetoothctl(),
|
||||
PipeWire(
|
||||
roles="hfp_hf",
|
||||
uuids=("0000111e-0000-1000-8000-00805f9b34fb",),
|
||||
),
|
||||
]
|
||||
|
||||
hfp_ag_host = [
|
||||
Bluetoothctl(),
|
||||
PipeWire(
|
||||
roles="hfp_ag",
|
||||
uuids=("0000111f-0000-1000-8000-00805f9b34fb",),
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
@host_config(hfp_ag_host, hfp_hf_host, mem=VM_MEM)
|
||||
def test_pipewire_hfp(paired_hosts):
|
||||
host0, host1 = paired_hosts
|
||||
|
||||
# Connect
|
||||
host1.bluetoothctl.send(f"trust {host0.bdaddr}\n")
|
||||
|
||||
host0.bluetoothctl.send(f"scan off\n")
|
||||
host0.bluetoothctl.send(f"connect {host1.bdaddr}\n")
|
||||
|
||||
# Wait for pipewire devices to appear
|
||||
check_pipewire_devices_exist(host0, "hfp")
|
||||
|
||||
# Test streaming
|
||||
host1.pipewire.pw_record()
|
||||
host0.pipewire.pw_play()
|
||||
|
||||
assert host1.pipewire.pw_record_wait_signal()
|
||||
|
||||
|
||||
def check_pipewire_devices_exist(host, profile="a2dp-sink"):
|
||||
factories = {
|
||||
"a2dp-sink": ("api.bluez5.a2dp.sink",),
|
||||
"a2dp-source": ("api.bluez5.a2dp.source",),
|
||||
"hfp": ("api.bluez5.sco.sink", "api.bluez5.sco.source"),
|
||||
"bap-sink": ("api.bluez5.media.sink",),
|
||||
"bap-source": ("api.bluez5.media.source",),
|
||||
"bap-duplex": ("api.bluez5.media.sink", "api.bluez5.media.source"),
|
||||
"device": ("bluez5",),
|
||||
}[profile]
|
||||
|
||||
text = ""
|
||||
|
||||
def cond():
|
||||
nonlocal text
|
||||
|
||||
text = host.pipewire.pw_dump()
|
||||
try:
|
||||
data = json.loads(text)
|
||||
except:
|
||||
return False
|
||||
|
||||
seen = set()
|
||||
for item in data:
|
||||
if item.get("type", None) == "PipeWire:Interface:Node":
|
||||
props = item["info"]["props"]
|
||||
seen.add(props.get("factory.name", None))
|
||||
continue
|
||||
if item.get("type", None) == "PipeWire:Interface:Device":
|
||||
props = item["info"]["props"]
|
||||
seen.add(props.get("device.api", None))
|
||||
continue
|
||||
|
||||
if not set(factories).difference(seen):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
try:
|
||||
wait_until(cond)
|
||||
except TimeoutError:
|
||||
assert False, f"pipewire devices not seen within timeout:\n{text}"
|
||||
31
test/pytest.ini
Normal file
31
test/pytest.ini
Normal file
|
|
@ -0,0 +1,31 @@
|
|||
#
|
||||
# To run Pytest tests, use
|
||||
#
|
||||
# meson devenv -C builddir -w . python3 -m pytest test --kernel-build -v
|
||||
#
|
||||
# meson devenv -C builddir -w . python3 -m pytest test --kernel /path/to/bzImage -v
|
||||
#
|
||||
# with bzImage built by yourself, or downloaded from some source eg.
|
||||
# https://github.com/pv/bluez-test-functional-kernel/releases/
|
||||
#
|
||||
# See https://github.com/pv/pytest-bluezenv or run
|
||||
#
|
||||
# python3 -m pytest test --help
|
||||
#
|
||||
|
||||
[pytest]
|
||||
log_format = %(asctime)s %(levelname)-6s %(name)-20s: %(message)s
|
||||
log_date_format = %Y-%m-%d %H:%M:%S.%f
|
||||
log_level = 0
|
||||
log_file = test-pytest.log
|
||||
markers =
|
||||
vm: tests requiring BlueZ VM environment
|
||||
|
||||
addopts =
|
||||
-p pytest_bluezenv
|
||||
|
||||
norecursedirs =
|
||||
data
|
||||
.pytest_cache
|
||||
|
||||
vm_timeout = 30
|
||||
Loading…
Add table
Add a link
Reference in a new issue