mirror of
https://github.com/labwc/labwc.git
synced 2025-10-29 05:40:24 -04:00
Simple utility to allow protocol and version comparison between different running compositors. This can be used to detect missing protocols or versions in labwc. See for example this result when running against the current labwc master and sway 1.10 (nested, results on DRM may differ): Protocols missing from labwc @ wayland-0 ext_transient_seat_manager_v1 1 wl_compositor 6 (has version 5) wp_alpha_modifier_v1 1 wp_content_type_manager_v1 1 xdg_wm_base 5 (has version 3) zwp_keyboard_shortcuts_inhibit_manager_v1 1 Protocols missing from sway @ wayland-1 wl_drm 2 zcosmic_workspace_manager_v1 1
158 lines
4.3 KiB
Python
Executable file
158 lines
4.3 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
import os
|
|
import socket
|
|
import struct
|
|
|
|
# object ids
|
|
WL_DISPLAY = 1
|
|
WL_REGISTRY = 2
|
|
WL_SYNC_DONE = 3
|
|
|
|
# opcodes requests
|
|
WL_DISPLAY_SYNC = 0
|
|
WL_DISPLAY_GET_REGISTRY = 1
|
|
|
|
# opcodes events
|
|
WL_REGISTRY_GLOBAL = 0
|
|
|
|
class ArgString:
|
|
def parse(data):
|
|
size = struct.unpack('=I', data[:4])[0]
|
|
data = data[4:4 + size - 1]
|
|
padding = (4 - (size % 4)) % 4
|
|
return 4 + size + padding, data.decode()
|
|
|
|
class ArgRegistryGlobal:
|
|
def parse(data):
|
|
global_id = struct.unpack('=I', data[:4])[0]
|
|
data = data[4:]
|
|
consumed, interface = ArgString.parse(data)
|
|
data = data[consumed:]
|
|
version = struct.unpack('=I', data)[0]
|
|
return global_id, interface, version
|
|
|
|
class Wayland:
|
|
def __init__(self, wl_socket, log=False):
|
|
if log:
|
|
print()
|
|
print(f' Connecting to {wl_socket}')
|
|
self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
|
|
self.socket.connect(wl_socket)
|
|
try:
|
|
creds = self.socket.getsockopt(
|
|
socket.SOL_SOCKET, socket.SO_PEERCRED, struct.calcsize('3i')
|
|
)
|
|
pid, uid, gid = struct.unpack('3i', creds)
|
|
with open(f'/proc/{pid}/comm', 'r') as f:
|
|
self.name = f.read().strip()
|
|
except:
|
|
self.name = 'Unknown'
|
|
if log:
|
|
print(f" Connected to {self.name}\n")
|
|
|
|
def wire(self, obj_id, opcode, data=b''):
|
|
size = 8 + len(data)
|
|
sizeop = size << 16 | opcode
|
|
self.socket.send(struct.pack('=II', obj_id, sizeop) + data)
|
|
|
|
def wire_arg_uint32(self, to_obj_id, opcode, arg):
|
|
self.wire(to_obj_id, opcode, struct.pack('=I', arg))
|
|
|
|
def parse_msg(self, obj_id, opcode, arg_data, interfaces):
|
|
if obj_id == WL_REGISTRY and opcode == WL_REGISTRY_GLOBAL:
|
|
_, interface, version = ArgRegistryGlobal.parse(arg_data)
|
|
interfaces[interface] = version
|
|
return True
|
|
return False
|
|
|
|
def get_interfaces(self):
|
|
self.wire_arg_uint32(WL_DISPLAY, WL_DISPLAY_GET_REGISTRY, WL_REGISTRY)
|
|
self.wire_arg_uint32(WL_DISPLAY, WL_DISPLAY_SYNC, WL_SYNC_DONE)
|
|
|
|
interfaces = dict()
|
|
|
|
old_data = b''
|
|
data = self.socket.recv(4096)
|
|
while data:
|
|
data = old_data + data
|
|
while len(data) >= 8:
|
|
obj_id, sizeop = struct.unpack('=II', data[:8])
|
|
size = sizeop >> 16
|
|
op = sizeop & 0xffff
|
|
if len(data) < size:
|
|
break
|
|
arg_data = data[8:size]
|
|
if obj_id == WL_DISPLAY:
|
|
# Ignore error and delete_id events
|
|
pass
|
|
elif obj_id == WL_SYNC_DONE:
|
|
# All interfaces have been announced
|
|
self.socket.shutdown(socket.SHUT_RDWR)
|
|
self.socket.close()
|
|
return interfaces
|
|
elif self.parse_msg(obj_id, op, arg_data, interfaces):
|
|
pass
|
|
else:
|
|
print(f"Unknown message received: obj_id {obj_id} op {op}")
|
|
data = data[size:]
|
|
old_data = data
|
|
data = self.socket.recv(4096)
|
|
|
|
wl_socket = os.path.basename(self.socket.getpeername())
|
|
print(f"error in wayland communication with {self.name} @ {wl_socket}\n")
|
|
self.socket.shutdown(socket.SHUT_RDWR)
|
|
self.socket.close()
|
|
return interfaces
|
|
|
|
if __name__ == '__main__':
|
|
import sys
|
|
|
|
runtime_dir = os.getenv('XDG_RUNTIME_DIR')
|
|
if not runtime_dir:
|
|
print("XDG_RUNTIME_DIR not set")
|
|
exit(1)
|
|
|
|
def find_wl_sockets(sockets):
|
|
x = 0
|
|
while True:
|
|
try:
|
|
os.stat(os.path.join(runtime_dir, f'wayland-{x}'))
|
|
sockets.append(f'wayland-{x}')
|
|
except FileNotFoundError:
|
|
break
|
|
x += 1
|
|
|
|
compositors = dict()
|
|
wl_sockets = sys.argv[1:]
|
|
if not wl_sockets:
|
|
find_wl_sockets(wl_sockets)
|
|
|
|
for wl_socket in wl_sockets:
|
|
wl = Wayland(os.path.join(runtime_dir, wl_socket), log=len(wl_sockets) == 1)
|
|
if len(wl_sockets) == 1:
|
|
print(" {:<45s} {:>2}".format("Interface", "Version"))
|
|
for name, version in sorted(wl.get_interfaces().items()):
|
|
print(" {:<45s} {:>2}".format(name, version))
|
|
print()
|
|
exit(0)
|
|
compositors[wl_socket] = (wl.name, wl.get_interfaces())
|
|
|
|
all_interfaces = set()
|
|
for _, (_, interfaces) in compositors.items():
|
|
all_interfaces |= set(interfaces.items())
|
|
|
|
for compositor, (compositor_name, interfaces) in compositors.items():
|
|
missing = all_interfaces - set(interfaces.items())
|
|
for name, version in set(missing):
|
|
if interfaces.get(name, 0) > version:
|
|
missing.remove((name, version))
|
|
if missing:
|
|
print()
|
|
print(f"\x1b[1m Protocols missing from {compositor_name} @ {compositor}\x1b[m")
|
|
for name, version in sorted(missing):
|
|
own_version = interfaces.get(name, 0)
|
|
print(" {:<45s} {:>2} {}".format(name, version,
|
|
f'(has version {own_version})' if own_version else '')
|
|
)
|
|
print()
|