import argparse import asyncio import base64 import json import os import struct import sys import urllib.request from typing import Any, Dict, List, Optional # Decky Loader Message Types CALL = 0 REPLY = 1 ERROR = -1 EVENT = 3 # Default store URL DEFAULT_STORE_URL = "https://plugins.deckbrew.xyz/plugins" # Timeouts (seconds) for the install loop IDLE_TIMEOUT = 60 # max gap between any two messages from server OVERALL_TIMEOUT = 300 # hard upper bound for the whole install # Store type mapping STORE_TYPE_NAMES = { 0: "default", 1: "testing", 2: "custom" } def log(*args: Any) -> None: """Print formatted logs to stderr.""" print("[DeckyInstaller]", *args, file=sys.stderr, flush=True) class DeckyClient: """ A robust client for Decky Loader using asyncio streams. """ def __init__(self, host: str = "127.0.0.1", port: int = 1337): self.host = host self.port = port self.reader: Optional[asyncio.StreamReader] = None self.writer: Optional[asyncio.StreamWriter] = None self.msg_id = 0 async def get_token(self) -> str: """Fetch the CSRF token via HTTP GET.""" url = f"http://{self.host}:{self.port}/auth/token" # Using a context manager for the request with urllib.request.urlopen(url, timeout=5) as response: return response.read().decode().strip() async def connect(self, token: str) -> None: """Connect and perform WebSocket handshake.""" self.reader, self.writer = await asyncio.open_connection(self.host, self.port) # Build handshake key = base64.b64encode(os.urandom(16)).decode() handshake = ( f"GET /ws?auth={token} HTTP/1.1\r\n" f"Host: {self.host}:{self.port}\r\n" "Upgrade: websocket\r\n" "Connection: Upgrade\r\n" f"Sec-WebSocket-Key: {key}\r\n" "Sec-WebSocket-Version: 13\r\n\r\n" ) self.writer.write(handshake.encode()) await self.writer.drain() # Read response headers (terminated by \r\n\r\n) header_data = b"" while b"\r\n\r\n" not in header_data: chunk = await self.reader.read(1024) if not chunk: raise ConnectionError("Server closed connection during handshake") header_data += chunk if b"101 Switching Protocols" not in header_data: raise RuntimeError(f"Handshake failed: {header_data.decode(errors='ignore')}") # Note: Any data after \r\n\r\n is the start of the first WS frame # asyncio.StreamReader handles the internal buffer automatically. async def send(self, msg_type: int, method: str, args: List[Any]) -> None: """Send a masked WebSocket text frame.""" self.msg_id += 1 message_dict = { "type": msg_type, "id": self.msg_id, "route": method, "args": args, } payload = json.dumps(message_dict).encode() length = len(payload) # Header: FIN=1, Opcode=1 (Text) frame = bytearray([0x81]) if length < 126: frame.append(length | 0x80) elif length < 65536: frame.append(126 | 0x80) frame.extend(struct.pack("!H", length)) else: frame.append(127 | 0x80) frame.extend(struct.pack("!Q", length)) # Client must mask data mask = os.urandom(4) frame.extend(mask) masked_payload = bytes(b ^ mask[i % 4] for i, b in enumerate(payload)) frame.extend(masked_payload) self.writer.write(frame) await self.writer.drain() async def recv(self) -> Optional[Dict[str, Any]]: """Receive and parse one WebSocket text frame.""" try: while True: # Read first 2 bytes: FIN/Opcode and Mask/Length head = await self.reader.readexactly(2) opcode = head[0] & 0x0F has_mask = head[1] & 0x80 length = head[1] & 0x7F if length == 126: ext_len = await self.reader.readexactly(2) length = struct.unpack("!H", ext_len)[0] elif length == 127: ext_len = await self.reader.readexactly(8) length = struct.unpack("!Q", ext_len)[0] if has_mask: mask = await self.reader.readexactly(4) payload_raw = await self.reader.readexactly(length) if has_mask: payload_raw = bytes(b ^ mask[i % 4] for i, b in enumerate(payload_raw)) # Handle control and non-text frames if opcode == 0x8: # Close return None if opcode == 0x9: # Ping -> Pong if self.writer: pong = bytearray([0x8A]) pong_len = len(payload_raw) if pong_len < 126: pong.append(pong_len | 0x80) elif pong_len < 65536: pong.append(126 | 0x80) pong.extend(struct.pack("!H", pong_len)) else: pong.append(127 | 0x80) pong.extend(struct.pack("!Q", pong_len)) mask = os.urandom(4) pong.extend(mask) masked_payload = bytes(b ^ mask[i % 4] for i, b in enumerate(payload_raw)) pong.extend(masked_payload) self.writer.write(pong) await self.writer.drain() continue if opcode == 0xA: # Pong continue if opcode != 0x1: # Not a text frame continue return json.loads(payload_raw.decode()) except (asyncio.IncompleteReadError, ConnectionError): return None async def close(self) -> None: """Send a WebSocket close frame and close the stream.""" if not self.writer: return try: # FIN=1, opcode=8 (Close), masked payload with status 1000 payload = struct.pack("!H", 1000) frame = bytearray([0x88, 0x80 | len(payload)]) mask = os.urandom(4) frame.extend(mask) masked_payload = bytes(b ^ mask[i % 4] for i, b in enumerate(payload)) frame.extend(masked_payload) self.writer.write(frame) await self.writer.drain() except Exception: pass finally: self.writer.close() await self.writer.wait_closed() async def run_installer(target_id: int, store_url: str) -> None: """Installation workflow.""" client = DeckyClient() success = False confirmed = False error: Optional[BaseException] = None try: log(f"Contacting Mock Server at {client.host}:{client.port}...") token = await client.get_token() await client.connect(token) log(f"Connection established. Fetching plugin metadata for ID: {target_id}") with urllib.request.urlopen(store_url, timeout=10) as response: store_raw = response.read().decode() plugins = json.loads(store_raw) target = next((p for p in plugins if int(p.get("id")) == int(target_id)), None) if not target: raise RuntimeError(f"plugin id {target_id} not found") plugin_name = target.get("name") or f"plugin-{target_id}" versions = target.get("versions") or [] if not versions: raise RuntimeError("store entry missing versions") latest = sorted(versions, key=lambda v: (v.get("name") or ""))[-1] version_name = latest.get("name") or "dev" artifact_url = latest.get("artifact") or "" hash_ = latest.get("hash") or "" if not artifact_url: raise RuntimeError("latest version missing artifact URL") log(f"Installing {plugin_name} v{version_name}") print("[" + " " * 20 + "] 0%", end="", file=sys.stderr, flush=True) installation_finished = False await client.send(CALL, "utilities/install_plugin", [artifact_url, plugin_name, version_name, hash_, 0]) loop = asyncio.get_event_loop() deadline = loop.time() + OVERALL_TIMEOUT while True: remaining = deadline - loop.time() if remaining <= 0: print("\r" + " " * 30 + "\r", end="", file=sys.stderr, flush=True) raise TimeoutError( f"Installation exceeded overall timeout of {OVERALL_TIMEOUT}s" ) try: msg = await asyncio.wait_for(client.recv(), timeout=min(IDLE_TIMEOUT, remaining)) except asyncio.TimeoutError: print("\r" + " " * 30 + "\r", end="", file=sys.stderr, flush=True) raise TimeoutError( f"No message from server for {IDLE_TIMEOUT}s; install appears stuck" ) from None if msg is None: print("\r" + " " * 30 + "\r", end="", file=sys.stderr, flush=True) log("Connection closed by server.") if confirmed: log("Install was confirmed; treating disconnect as success.") success = True break m_type = msg.get("type") if m_type == EVENT and msg.get("event") == "loader/add_plugin_install_prompt": m_args = msg.get("args", []) if len(m_args) < 3: print("\r" + " " * 30 + "\r", end="", file=sys.stderr, flush=True) log(f"Invalid install prompt args: {m_args}") continue request_id = m_args[2] print("\r" + " " * 30 + "\r", end="", file=sys.stderr, flush=True) log("Prompt received, sending confirmation...") await client.send(CALL, "utilities/confirm_plugin_install", [request_id]) confirmed = True elif m_type == EVENT and msg.get("event") == "loader/plugin_download_info": m_args = msg.get("args", []) if len(m_args) >= 1: progress = m_args[0] filled = int(20 * progress / 100) bar = "=" * filled + " " * (20 - filled) print(f"\r[{bar}] {progress}%", end="", file=sys.stderr, flush=True) elif m_type == EVENT and msg.get("event") == "loader/plugin_download_finish": print(f"\r[{'=' * 20}] 100%", file=sys.stderr) log(f"Installation successful: {msg.get('args')}") installation_finished = True success = True break elif m_type == REPLY and msg.get("result") is not None: print("\r" + " " * 30 + "\r", end="", file=sys.stderr, flush=True) log(f"Server reply: {msg.get('result')}") if installation_finished: break elif m_type == ERROR: print("\r" + " " * 30 + "\r", end="", file=sys.stderr, flush=True) log(f"Server error: {msg.get('error')}") except Exception as e: log(f"Error: {e}") error = e finally: await client.close() if error: raise error if not success: raise RuntimeError("Installation did not complete successfully") async def configure_store_url(store_url: str) -> None: """Configure custom store URL in Decky settings.""" client = DeckyClient() try: log(f"Connecting to Decky server at {client.host}:{client.port}...") token = await client.get_token() await client.connect(token) # First, set the store type to 2 (custom) log("Setting store type to custom (2)...") await client.send(CALL, "utilities/settings/set", ["store", 2]) msg = await client.recv() if msg is None: raise RuntimeError("Connection closed by server") if msg.get("type") == ERROR: log(f"Server error setting store type: {msg.get('error')}") raise RuntimeError(f"Failed to set store type: {msg.get('error')}") log("Store type set to custom") log(f"Setting custom store URL: {store_url}") await client.send(CALL, "utilities/settings/set", ["store-url", store_url]) msg = await client.recv() if msg is None: raise RuntimeError("Connection closed by server") m_type = msg.get("type") if m_type == REPLY: log(f"Store URL configured successfully: {msg.get('result')}") elif m_type == ERROR: log(f"Server error: {msg.get('error')}") raise RuntimeError(f"Failed to set store URL: {msg.get('error')}") except Exception as e: log(f"Error: {e}") raise finally: await client.close() async def get_store_url() -> str: """Get the configured custom store URL and type from Decky settings.""" client = DeckyClient() try: log(f"Connecting to Decky server at {client.host}:{client.port}...") token = await client.get_token() await client.connect(token) # Get store type log("Getting configured store type...") await client.send(CALL, "utilities/settings/get", ["store", 0]) msg = await client.recv() if msg is None: raise RuntimeError("Connection closed by server") if msg.get("type") == REPLY: store_type = msg.get("result") store_type_name = STORE_TYPE_NAMES.get(store_type, f"unknown ({store_type})") log(f"Current store type: {store_type_name}") elif msg.get("type") == ERROR: log(f"Server error: {msg.get('error')}") raise RuntimeError(f"Failed to get store type: {msg.get('error')}") # Get store URL log("Getting configured store URL...") await client.send(CALL, "utilities/settings/get", ["store-url", DEFAULT_STORE_URL]) msg = await client.recv() if msg is None: raise RuntimeError("Connection closed by server") m_type = msg.get("type") if m_type == REPLY: store_url = msg.get("result") log(f"Current store URL: {store_url}") return store_url elif m_type == ERROR: log(f"Server error: {msg.get('error')}") raise RuntimeError(f"Failed to get store URL: {msg.get('error')}") raise RuntimeError("Unexpected response type") except Exception as e: log(f"Error: {e}") raise finally: await client.close() if __name__ == "__main__": parser = argparse.ArgumentParser( description="Decky Loader Client - Manage plugins and settings", formatter_class=argparse.RawDescriptionHelpFormatter ) subparsers = parser.add_subparsers(dest="command", help="Available commands") # Install subcommand install_parser = subparsers.add_parser( "install", help="Install a plugin from the store" ) install_parser.add_argument( "--store-url", default="http://127.0.0.1:1337/plugins", help="Plugin store URL to fetch plugins from" ) install_parser.add_argument( "--target-id", type=int, default=42, help="Plugin ID to install (default: 42)" ) # Configure store subcommand config_parser = subparsers.add_parser( "configure-store", help="Configure custom store URL in Decky settings" ) config_parser.add_argument( "url", help="Custom store URL to configure" ) # Get store subcommand subparsers.add_parser( "get-store", help="Get the configured custom store URL" ) args = parser.parse_args() # Execute based on subcommand if args.command == "install": asyncio.run(run_installer( target_id=args.target_id, store_url=args.store_url )) elif args.command == "configure-store": asyncio.run(configure_store_url(args.url)) elif args.command == "get-store": asyncio.run(get_store_url()) else: parser.print_help() sys.exit(1)