labwc/scripts/wl_compcheck.py

159 lines
4.3 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
import os
import socket
import struct
# object ids
WL_DISPLAY = 1
WL_REGISTRY = 2
WL_SYNC_DONE = 3
# opcodes requests
WL_DISPLAY_SYNC = 0
WL_DISPLAY_GET_REGISTRY = 1
# opcodes events
WL_REGISTRY_GLOBAL = 0
class ArgString:
def parse(data):
size = struct.unpack('=I', data[:4])[0]
data = data[4:4 + size - 1]
padding = (4 - (size % 4)) % 4
return 4 + size + padding, data.decode()
class ArgRegistryGlobal:
def parse(data):
global_id = struct.unpack('=I', data[:4])[0]
data = data[4:]
consumed, interface = ArgString.parse(data)
data = data[consumed:]
version = struct.unpack('=I', data)[0]
return global_id, interface, version
class Wayland:
def __init__(self, wl_socket, log=False):
if log:
print()
print(f' Connecting to {wl_socket}')
self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
self.socket.connect(wl_socket)
try:
creds = self.socket.getsockopt(
socket.SOL_SOCKET, socket.SO_PEERCRED, struct.calcsize('3i')
)
pid, uid, gid = struct.unpack('3i', creds)
with open(f'/proc/{pid}/comm', 'r') as f:
self.name = f.read().strip()
except:
self.name = 'Unknown'
if log:
print(f" Connected to {self.name}\n")
def wire(self, obj_id, opcode, data=b''):
size = 8 + len(data)
sizeop = size << 16 | opcode
self.socket.send(struct.pack('=II', obj_id, sizeop) + data)
def wire_arg_uint32(self, to_obj_id, opcode, arg):
self.wire(to_obj_id, opcode, struct.pack('=I', arg))
def parse_msg(self, obj_id, opcode, arg_data, interfaces):
if obj_id == WL_REGISTRY and opcode == WL_REGISTRY_GLOBAL:
_, interface, version = ArgRegistryGlobal.parse(arg_data)
interfaces[interface] = version
return True
return False
def get_interfaces(self):
self.wire_arg_uint32(WL_DISPLAY, WL_DISPLAY_GET_REGISTRY, WL_REGISTRY)
self.wire_arg_uint32(WL_DISPLAY, WL_DISPLAY_SYNC, WL_SYNC_DONE)
interfaces = dict()
old_data = b''
data = self.socket.recv(4096)
while data:
data = old_data + data
while len(data) >= 8:
obj_id, sizeop = struct.unpack('=II', data[:8])
size = sizeop >> 16
op = sizeop & 0xffff
if len(data) < size:
break
arg_data = data[8:size]
if obj_id == WL_DISPLAY:
# Ignore error and delete_id events
pass
elif obj_id == WL_SYNC_DONE:
# All interfaces have been announced
self.socket.shutdown(socket.SHUT_RDWR)
self.socket.close()
return interfaces
elif self.parse_msg(obj_id, op, arg_data, interfaces):
pass
else:
print(f"Unknown message received: obj_id {obj_id} op {op}")
data = data[size:]
old_data = data
data = self.socket.recv(4096)
wl_socket = os.path.basename(self.socket.getpeername())
print(f"error in wayland communication with {self.name} @ {wl_socket}\n")
self.socket.shutdown(socket.SHUT_RDWR)
self.socket.close()
return interfaces
if __name__ == '__main__':
import sys
runtime_dir = os.getenv('XDG_RUNTIME_DIR')
if not runtime_dir:
print("XDG_RUNTIME_DIR not set")
exit(1)
def find_wl_sockets(sockets):
x = 0
while True:
try:
os.stat(os.path.join(runtime_dir, f'wayland-{x}'))
sockets.append(f'wayland-{x}')
except FileNotFoundError:
break
x += 1
compositors = dict()
wl_sockets = sys.argv[1:]
if not wl_sockets:
find_wl_sockets(wl_sockets)
for wl_socket in wl_sockets:
wl = Wayland(os.path.join(runtime_dir, wl_socket), log=len(wl_sockets) == 1)
if len(wl_sockets) == 1:
print(" {:<45s} {:>2}".format("Interface", "Version"))
for name, version in sorted(wl.get_interfaces().items()):
print(" {:<45s} {:>2}".format(name, version))
print()
exit(0)
compositors[wl_socket] = (wl.name, wl.get_interfaces())
all_interfaces = set()
for _, (_, interfaces) in compositors.items():
all_interfaces |= set(interfaces.items())
for compositor, (compositor_name, interfaces) in compositors.items():
missing = all_interfaces - set(interfaces.items())
for name, version in set(missing):
if interfaces.get(name, 0) > version:
missing.remove((name, version))
if missing:
print()
print(f"\x1b[1m Protocols missing from {compositor_name} @ {compositor}\x1b[m")
for name, version in sorted(missing):
own_version = interfaces.get(name, 0)
print(" {:<45s} {:>2} {}".format(name, version,
f'(has version {own_version})' if own_version else '')
)
print()