#!/usr/bin/env python3 import os import socket import struct # object ids WL_DISPLAY = 1 WL_REGISTRY = 2 WL_SYNC_DONE = 3 # opcodes requests WL_DISPLAY_SYNC = 0 WL_DISPLAY_GET_REGISTRY = 1 # opcodes events WL_REGISTRY_GLOBAL = 0 class ArgString: def parse(data): size = struct.unpack('=I', data[:4])[0] data = data[4:4 + size - 1] padding = (4 - (size % 4)) % 4 return 4 + size + padding, data.decode() class ArgRegistryGlobal: def parse(data): global_id = struct.unpack('=I', data[:4])[0] data = data[4:] consumed, interface = ArgString.parse(data) data = data[consumed:] version = struct.unpack('=I', data)[0] return global_id, interface, version class Wayland: def __init__(self, wl_socket, log=False): if log: print() print(f' Connecting to {wl_socket}') self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) self.socket.connect(wl_socket) try: creds = self.socket.getsockopt( socket.SOL_SOCKET, socket.SO_PEERCRED, struct.calcsize('3i') ) pid, uid, gid = struct.unpack('3i', creds) with open(f'/proc/{pid}/comm', 'r') as f: self.name = f.read().strip() except: self.name = 'Unknown' if log: print(f" Connected to {self.name}\n") def wire(self, obj_id, opcode, data=b''): size = 8 + len(data) sizeop = size << 16 | opcode self.socket.send(struct.pack('=II', obj_id, sizeop) + data) def wire_arg_uint32(self, to_obj_id, opcode, arg): self.wire(to_obj_id, opcode, struct.pack('=I', arg)) def parse_msg(self, obj_id, opcode, arg_data, interfaces): if obj_id == WL_REGISTRY and opcode == WL_REGISTRY_GLOBAL: _, interface, version = ArgRegistryGlobal.parse(arg_data) interfaces[interface] = version return True return False def get_interfaces(self): self.wire_arg_uint32(WL_DISPLAY, WL_DISPLAY_GET_REGISTRY, WL_REGISTRY) self.wire_arg_uint32(WL_DISPLAY, WL_DISPLAY_SYNC, WL_SYNC_DONE) interfaces = dict() old_data = b'' data = self.socket.recv(4096) while data: data = old_data + data while len(data) >= 8: obj_id, sizeop = struct.unpack('=II', data[:8]) size = sizeop >> 16 op = sizeop & 0xffff if len(data) < size: break arg_data = data[8:size] if obj_id == WL_DISPLAY: # Ignore error and delete_id events pass elif obj_id == WL_SYNC_DONE: # All interfaces have been announced self.socket.shutdown(socket.SHUT_RDWR) self.socket.close() return interfaces elif self.parse_msg(obj_id, op, arg_data, interfaces): pass else: print(f"Unknown message received: obj_id {obj_id} op {op}") data = data[size:] old_data = data data = self.socket.recv(4096) wl_socket = os.path.basename(self.socket.getpeername()) print(f"error in wayland communication with {self.name} @ {wl_socket}\n") self.socket.shutdown(socket.SHUT_RDWR) self.socket.close() return interfaces if __name__ == '__main__': import sys runtime_dir = os.getenv('XDG_RUNTIME_DIR') if not runtime_dir: print("XDG_RUNTIME_DIR not set") exit(1) def find_wl_sockets(sockets): x = 0 while True: try: os.stat(os.path.join(runtime_dir, f'wayland-{x}')) sockets.append(f'wayland-{x}') except FileNotFoundError: break x += 1 compositors = dict() wl_sockets = sys.argv[1:] if not wl_sockets: find_wl_sockets(wl_sockets) for wl_socket in wl_sockets: wl = Wayland(os.path.join(runtime_dir, wl_socket), log=len(wl_sockets) == 1) if len(wl_sockets) == 1: print(" {:<45s} {:>2}".format("Interface", "Version")) for name, version in sorted(wl.get_interfaces().items()): print(" {:<45s} {:>2}".format(name, version)) print() exit(0) compositors[wl_socket] = (wl.name, wl.get_interfaces()) all_interfaces = set() for _, (_, interfaces) in compositors.items(): all_interfaces |= set(interfaces.items()) for compositor, (compositor_name, interfaces) in compositors.items(): missing = all_interfaces - set(interfaces.items()) for name, version in set(missing): if interfaces.get(name, 0) > version: missing.remove((name, version)) if missing: print() print(f"\x1b[1m Protocols missing from {compositor_name} @ {compositor}\x1b[m") for name, version in sorted(missing): own_version = interfaces.get(name, 0) print(" {:<45s} {:>2} {}".format(name, version, f'(has version {own_version})' if own_version else '') ) print()