diff --git a/test/bluezenv/__init__.py b/test/bluezenv/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/test/bluezenv/conftest.py b/test/bluezenv/conftest.py new file mode 100644 index 000000000..03c635ed5 --- /dev/null +++ b/test/bluezenv/conftest.py @@ -0,0 +1,95 @@ +# -*- coding: utf-8; mode: python; eval: (blacken-mode); -*- +# SPDX-FileCopyrightText: Copyright © 2026 Pauli Virtanen +# SPDX-License-Identifier: MIT + +import pytest +import warnings + +from pytest_bluezenv import Bluetoothd + + +@pytest.fixture +def paired_hosts(hosts, host_setup): + """ + Provide two hosts, paired with each other, and the first one Central + """ + + le = any( + "ControllerMode = le" in (p.conf or "") + for plugins in host_setup["setup"] + for p in plugins + if isinstance(p, Bluetoothd) + ) + + if le: + yield from _pair_le(hosts) + else: + yield from _pair_bredr(hosts) + + +def _pair_bredr(hosts): + host0, host1 = hosts + + host0.bluetoothctl.send("scan on\n") + host0.bluetoothctl.expect(f"Controller {host0.bdaddr.upper()} Discovering: yes") + + host1.bluetoothctl.send("pairable on\n") + host1.bluetoothctl.expect("Changing pairable on succeeded") + host1.bluetoothctl.send("discoverable on\n") + host1.bluetoothctl.expect(f"Controller {host1.bdaddr.upper()} Discoverable: yes") + + host0.bluetoothctl.expect(f"Device {host1.bdaddr.upper()}") + host0.bluetoothctl.send(f"pair {host1.bdaddr}\n") + + idx, m = host0.bluetoothctl.expect(r"Confirm passkey (\d+).*:") + key = m[0].decode("utf-8") + + host1.bluetoothctl.expect(f"Confirm passkey {key}") + + host0.bluetoothctl.send("yes\n") + host1.bluetoothctl.send("yes\n") + + host0.bluetoothctl.expect("Pairing successful") + + yield hosts + + +def _pair_le(hosts): + host0, host1 = hosts + + host0.bluetoothctl.send("scan on\n") + host0.bluetoothctl.expect(f"Controller {host0.bdaddr.upper()} Discovering: yes") + + host1.bluetoothctl.send("advertise on\n") + host1.bluetoothctl.expect("Advertising object registered") + + host0.bluetoothctl.expect(f"Device {host1.bdaddr.upper()}") + host0.bluetoothctl.send(f"pair {host1.bdaddr.upper()}\n") + + # BUG!: if controller is power cycled off/on at boot (before bluetoothd) + # BUG!: which is what the tester here does, + # BUG!: bluetoothd MGMT command to enable Secure Connections Host Support + # BUG!: fails and we are left with legacy passkey. It seems we get randomly + # BUG!: one of these depending on what state controller/kernel were before + # BUG!: btmgmt power off/on + + idx, m = host0.bluetoothctl.expect( + [r"\[agent\].*Passkey:.*m(\d+)", r"Confirm passkey (\d+).*:"] + ) + key = m[0].decode("utf-8") + + if idx == 0: + warnings.warn( + "BUG: we got passkey authentication, bluetoothd/kernel should be fixed" + ) + host1.bluetoothctl.expect(r"\[agent\] Enter passkey \(number in 0-999999\):") + host1.bluetoothctl.send(f"{key}\n") + else: + host1.bluetoothctl.expect(f"Confirm passkey {key}") + + host0.bluetoothctl.send("yes\n") + host1.bluetoothctl.send("yes\n") + + host0.bluetoothctl.expect("Pairing successful") + + yield hosts diff --git a/test/bluezenv/test_streaming.py b/test/bluezenv/test_streaming.py new file mode 100644 index 000000000..a67304751 --- /dev/null +++ b/test/bluezenv/test_streaming.py @@ -0,0 +1,554 @@ +# -*- coding: utf-8; mode: python; eval: (blacken-mode); -*- +# SPDX-FileCopyrightText: Copyright © 2026 Pauli Virtanen +# SPDX-License-Identifier: MIT +""" +Tests for PipeWire audio streaming + +To use uninstalled version of PipeWire, run the tests in PipeWire +devenv:: + + meson devenv -C ../pipewire/builddir -w . python3 -mpytest test/bluezenv -v +""" +import sys +import os +import re +import pytest +import subprocess +import tempfile +import time +import logging +import json +import dbus +import threading +from pathlib import Path + +import pytest + +from pytest_bluezenv import ( + HostPlugin, + host_config, + find_exe, + Bluetoothd, + Bluetoothctl, + DbusSession, + LogStream, + wait_until, + mainloop_wrap, +) + +pytestmark = [pytest.mark.vm] + +log = logging.getLogger(__name__) + +# Use larger VM instances in case ASAN is enabled +VM_MEM = "512M" + + +class PipeWire(HostPlugin): + """ + Launch PipeWire in VM instance + """ + + name = "pipewire" + depends = [DbusSession(), Bluetoothd()] + + def __init__( + self, + uuids=( + "0000110a-0000-1000-8000-00805f9b34fb", + "0000110b-0000-1000-8000-00805f9b34fb", + ), + roles="a2dp_sink a2dp_source", + config=None, + ): + self.uuids = tuple(uuids) + self.roles = str(roles) + self.config = config + + # For running PipeWire from build directory + self.devenv = {} + if os.environ.get("PW_UNINSTALLED"): + devenv_keys = [ + "WIREPLUMBER_MODULE_DIR", + "WIREPLUMBER_CONFIG_DIR", + "WIREPLUMBER_DATA_DIR", + "PIPEWIRE_CONFIG_DIR", + "PIPEWIRE_MODULE_DIR", + "SPA_PLUGIN_DIR", + "SPA_DATA_DIR", + "ACP_PATHS_DIR", + "ACP_PROFILES_DIR", + "GST_PLUGIN_PATH", + "ALSA_PLUGIN_DIR", + "LD_LIBRARY_PATH", + "PW_UNINSTALLED", + "PW_BUILDDIR", + "PATH", + ] + for key in devenv_keys: + value = os.environ.get(key) + if value is not None: + self.devenv[key] = value + + def presetup(self, config): + try: + self.exe_pw = find_exe("", "pipewire") + self.exe_wp = find_exe("", "wireplumber") + self.exe_dump = find_exe("", "pw-dump") + self.exe_play = find_exe("", "pw-play") + self.exe_record = find_exe("", "pw-record") + + # get versions + res = subprocess.run( + [self.exe_pw, "--version"], stdout=subprocess.PIPE, encoding="utf-8" + ) + m = re.search("libpipewire ([0-9.]+)", res.stdout) + if m: + pw_version = tuple(int(x) for x in m.group(1).split(".")) + else: + raise ValueError("pipewire version unknown") + + res = subprocess.run( + [self.exe_wp, "--version"], stdout=subprocess.PIPE, encoding="utf-8" + ) + m = re.search("libwireplumber ([0-9.]+)", res.stdout) + if m: + wp_version = tuple(int(x) for x in m.group(1).split(".")) + else: + raise ValueError("wireplumber version unknown") + + # check versions + if pw_version >= (1, 6, 0) and pw_version <= (1, 6, 2): + raise ValueError("buggy pipewire version") + if pw_version < (1, 4, 9): + raise ValueError("pipewire too old") + if wp_version < (0, 5, 8): + raise ValueError("wireplumber too old") + except (FileNotFoundError, ValueError) as exc: + pytest.skip(reason=f"PipeWire: {exc}") + + @mainloop_wrap + def setup(self, impl): + self.play = None + self.record = None + self.log = logging.getLogger(self.name) + + self.tmpdir = tempfile.TemporaryDirectory(prefix="pipewire-", dir="/run") + conf_dir = Path(self.tmpdir.name) / "config" + runtime_dir = Path(self.tmpdir.name) / "runtime" + state_dir = Path(self.tmpdir.name) / "state" + + dropin_dir = conf_dir / "wireplumber" / "wireplumber.conf.d" + wp_conf = dropin_dir / "01-config.conf" + wp_extra_conf = dropin_dir / "02-extra-config.conf" + + conf_dir.mkdir() + runtime_dir.mkdir() + dropin_dir.mkdir(parents=True) + state_dir.mkdir() + + self.environ = environ = dict(os.environ) + environ.update(self.devenv) + + environ["XDG_CONFIG_HOME"] = str(conf_dir) + environ["XDG_STATE_HOME"] = str(runtime_dir) + environ["XDG_RUNTIME_HOME"] = str(runtime_dir) + environ["PIPEWIRE_RUNTIME_DIR"] = str(runtime_dir) + environ["XDG_STATE_HOME"] = str(state_dir) + environ["PIPEWIRE_DEBUG"] = "2" + environ["WIREPLUMBER_DEBUG"] = ( + "spa.bluez5.iso:3,spa.bluez5*:4,s-monitors:4,m-lua-scripting:4,s-linking:4,s-device:4" + ) + + # Handle devenv + if "WIREPLUMBER_CONFIG_DIR" in environ: + environ["WIREPLUMBER_CONFIG_DIR"] = ( + environ["WIREPLUMBER_CONFIG_DIR"] + ":" + str(conf_dir / "wireplumber") + ) + + with open(wp_conf, "w") as f: + text = f""" + monitor.bluez.properties = {{ + bluez5.roles = [ {self.roles} ] + bluez5.decode-buffer.latency = 4096 + }} + """ + f.write(text) + + if self.config is not None: + with open(wp_extra_conf, "w") as f: + f.write(self.config) + + log.info(f"Starting pipewire: {self.exe_pw}") + + self.logger = LogStream("pipewire") + self.pw = subprocess.Popen( + self.exe_pw, + env=environ, + stdout=self.logger.stream, + stderr=subprocess.STDOUT, + ) + + log.info(f"Starting wireplumber: {self.exe_wp}") + + self.wp = subprocess.Popen( + self.exe_wp, + env=environ, + stdout=self.logger.stream, + stderr=subprocess.STDOUT, + ) + + # Wait for PipeWire's bluetooth services + log.info("Wait for PipeWire...") + bus = dbus.SystemBus() + bus.set_exit_on_disconnect(False) + adapter = dbus.Interface( + bus.get_object("org.bluez", "/org/bluez/hci0"), + "org.freedesktop.DBus.Properties", + ) + + def cond(): + uuids = [str(uuid) for uuid in adapter.Get("org.bluez.Adapter1", "UUIDs")] + return all(uuid in uuids for uuid in self.uuids) + + wait_until(cond) + + os.environ["PIPEWIRE_RUNTIME_DIR"] = str(runtime_dir) + + # Wait for wireplumber session services + text = None + + def cond(): + nonlocal text + + text = self.pw_dump() + try: + data = json.loads(text) + except: + return False + for item in data: + if item.get("type", None) != "PipeWire:Interface:Client": + continue + if item["info"]["props"]["application.name"] != "WirePlumber": + continue + if "api.bluez" in item["info"]["props"].get("session.services", ""): + return True + return False + + try: + wait_until(cond) + except: + raise TimeoutError(f"PipeWire not ready\n{text}") + + log.info("PipeWire ready") + + def pw_dump(self): + ret = subprocess.run( + [self.exe_dump], stdout=subprocess.PIPE, encoding="utf-8", env=self.environ + ) + + return ret.stdout + + def pw_play(self): + self.play = subprocess.Popen( + [ + self.exe_play, + "--raw", + "--rate", + "4000", + "--channels", + "1", + "--format", + "s8", + "-", + ], + stdin=subprocess.PIPE, + env=self.environ, + ) + self.play_thread = threading.Thread( + target=self._play_thread, args=(self.play.stdin,) + ) + self.play_thread.start() + + def _play_thread(self, stream): + block = bytes([j % 256 for j in range(4096)]) + while True: + try: + stream.write(block) + except: + self.log.info("pw_play ended") + break + + def pw_record(self): + self.record = subprocess.Popen( + [ + self.exe_record, + "-P", + "media.class=Audio/Sink", + "--raw", + "--format", + "s8", + "--rate", + "4000", + "--channels", + "1", + "-", + ], + stdout=subprocess.PIPE, + env=self.environ, + ) + self.record_thread = threading.Thread( + target=self._record_thread, args=(self.record.stdout,) + ) + self.record_thread.start() + self.record_signal = threading.Event() + + def _record_thread(self, stream): + while True: + try: + block = stream.read(256) + if not block: + break + except: + self.log.info("pw_record failed") + break + + # If we get anything nonzero, some signal is getting + # through. Can't check exactness due to encoding and + # possibly heavy underruns in VM environment. + if any(list(block)): + self.log.info("pw_record signal found") + self.record_success = True + self.record_signal.set() + return + else: + self.log.debug("pw_record: waiting for signal") + + self.log.error("pw_record: no signal found") + self.record_success = False + self.record_signal.set() + + def pw_record_wait_signal(self, timeout=160): + res = self.record_signal.wait(timeout=timeout) + return res and self.record_success + + def teardown(self): + log.info("Stop pipewire") + self.pw.terminate() + self.wp.terminate() + if self.play is not None: + try: + self.play.stdin.close() + except BrokenPipeError: + pass + self.play.terminate() + self.play_thread.join() + if self.record is not None: + try: + self.record.stdout.close() + except BrokenPipeError: + pass + self.record.terminate() + self.record_thread.join() + self.tmpdir.cleanup() + + +a2dp_host = [Bluetoothctl(), PipeWire(roles="a2dp_sink a2dp_source")] + + +@host_config(a2dp_host, a2dp_host, mem=VM_MEM) +def test_pipewire_a2dp(paired_hosts): + host0, host1 = paired_hosts + + # Connect + host1.bluetoothctl.send(f"trust {host0.bdaddr}\n") + host0.bluetoothctl.send(f"connect {host1.bdaddr}\n") + + # Wait for pipewire devices to appear + check_pipewire_devices_exist(host0, "a2dp-sink") + + # Test streaming + host1.pipewire.pw_record() + host0.pipewire.pw_play() + + assert host1.pipewire.pw_record_wait_signal() + + +bap_ucast_host = [ + Bluetoothd(conf="[General]\nControllerMode = le\n", args=["-E", "-K"]), + Bluetoothctl(), + PipeWire( + roles="bap_sink bap_source", uuids=("00001850-0000-1000-8000-00805f9b34fb",) + ), +] + + +@host_config(bap_ucast_host, bap_ucast_host, mem=VM_MEM) +def test_pipewire_bap_ucast(paired_hosts): + host0, host1 = paired_hosts + + # Connect + host1.bluetoothctl.send(f"trust {host0.bdaddr}\n") + + host0.bluetoothctl.send(f"scan off\n") + host0.bluetoothctl.send(f"connect {host1.bdaddr}\n") + + # Wait for pipewire devices to appear + check_pipewire_devices_exist(host0, "bap-sink") + + # Test streaming + host1.pipewire.pw_record() + host0.pipewire.pw_play() + + assert host1.pipewire.pw_record_wait_signal() + + +bcast_src_config = """ +monitor.bluez.properties = { + bluez5.bcast_source.config = [ + { + "broadcast_code": "Test", + "encryption": false, + "bis": [ { "qos_preset": "16_2_1" } ] + } + ] +} +""" + +bap_bcast_src_host = [ + Bluetoothd(conf="[General]\nControllerMode = le\n", args=["-E", "-K"]), + Bluetoothctl(), + PipeWire( + roles="bap_bcast_source", + uuids=("00001850-0000-1000-8000-00805f9b34fb",), + config=bcast_src_config, + ), +] + +bap_bcast_snk_host = [ + Bluetoothd(conf="[General]\nControllerMode = le\n", args=["-E", "-K"]), + Bluetoothctl(), + PipeWire(roles="bap_bcast_sink", uuids=("00001850-0000-1000-8000-00805f9b34fb",)), +] + + +# BUG!: the bcast test is sometimes flaky because BlueZ has a hardcoded +# BUG!: 3 sec DBus timeout and Wireplumber on the VM may not boot up +# BUG!: fast enough + + +@host_config(bap_bcast_src_host, bap_bcast_snk_host, mem=VM_MEM) +def test_pipewire_bap_bcast(hosts): + host0, host1 = hosts + + # Start broadcasting + check_pipewire_devices_exist(host0, "bap-sink") + host0.pipewire.pw_play() + + # Connect + host1.bluetoothctl.send(f"scan on\n") + + host0.bluetoothctl.send(f"advertise on\n") + + host1.pipewire.pw_record() + + idx, m = host1.bluetoothctl.expect(f"Transport (/org/bluez/hci0/.+)") + transport = m[0].decode("utf-8") + + # BUG!: issuing transport select immediately causes failure + # BUG!: as it tries to enter broadcasting state while config(1) + # BUG!: is not finished and BROADCASTING state gets cancelled via + # BUG!: transport.c:bap_state_changed() + # BUG!: -> transport_update_playing(transport, FALSE) + # BUG!: -> transport_set_state(transport, TRANSPORT_STATE_IDLE) + + # TODO: fix the bug and go to transport.select without waiting here + check_pipewire_devices_exist(host1, "device") + + host1.bluetoothctl.send(f"transport.select {transport}\n") + + check_pipewire_devices_exist(host1, "bap-source") + + # Test streaming + assert host1.pipewire.pw_record_wait_signal() + + +hfp_hf_host = [ + Bluetoothctl(), + PipeWire( + roles="hfp_hf", + uuids=("0000111e-0000-1000-8000-00805f9b34fb",), + ), +] + +hfp_ag_host = [ + Bluetoothctl(), + PipeWire( + roles="hfp_ag", + uuids=("0000111f-0000-1000-8000-00805f9b34fb",), + ), +] + + +@host_config(hfp_ag_host, hfp_hf_host, mem=VM_MEM) +def test_pipewire_hfp(paired_hosts): + host0, host1 = paired_hosts + + # Connect + host1.bluetoothctl.send(f"trust {host0.bdaddr}\n") + + host0.bluetoothctl.send(f"scan off\n") + host0.bluetoothctl.send(f"connect {host1.bdaddr}\n") + + # Wait for pipewire devices to appear + check_pipewire_devices_exist(host0, "hfp") + + # Test streaming + host1.pipewire.pw_record() + host0.pipewire.pw_play() + + assert host1.pipewire.pw_record_wait_signal() + + +def check_pipewire_devices_exist(host, profile="a2dp-sink"): + factories = { + "a2dp-sink": ("api.bluez5.a2dp.sink",), + "a2dp-source": ("api.bluez5.a2dp.source",), + "hfp": ("api.bluez5.sco.sink", "api.bluez5.sco.source"), + "bap-sink": ("api.bluez5.media.sink",), + "bap-source": ("api.bluez5.media.source",), + "bap-duplex": ("api.bluez5.media.sink", "api.bluez5.media.source"), + "device": ("bluez5",), + }[profile] + + text = "" + + def cond(): + nonlocal text + + text = host.pipewire.pw_dump() + try: + data = json.loads(text) + except: + return False + + seen = set() + for item in data: + if item.get("type", None) == "PipeWire:Interface:Node": + props = item["info"]["props"] + seen.add(props.get("factory.name", None)) + continue + if item.get("type", None) == "PipeWire:Interface:Device": + props = item["info"]["props"] + seen.add(props.get("device.api", None)) + continue + + if not set(factories).difference(seen): + return True + + return False + + try: + wait_until(cond) + except TimeoutError: + assert False, f"pipewire devices not seen within timeout:\n{text}" diff --git a/test/pytest.ini b/test/pytest.ini new file mode 100644 index 000000000..597ca1b68 --- /dev/null +++ b/test/pytest.ini @@ -0,0 +1,31 @@ +# +# To run Pytest tests, use +# +# meson devenv -C builddir -w . python3 -m pytest test --kernel-build -v +# +# meson devenv -C builddir -w . python3 -m pytest test --kernel /path/to/bzImage -v +# +# with bzImage built by yourself, or downloaded from some source eg. +# https://github.com/pv/bluez-test-functional-kernel/releases/ +# +# See https://github.com/pv/pytest-bluezenv or run +# +# python3 -m pytest test --help +# + +[pytest] +log_format = %(asctime)s %(levelname)-6s %(name)-20s: %(message)s +log_date_format = %Y-%m-%d %H:%M:%S.%f +log_level = 0 +log_file = test-pytest.log +markers = + vm: tests requiring BlueZ VM environment + +addopts = + -p pytest_bluezenv + +norecursedirs = + data + .pytest_cache + +vm_timeout = 30