From a7669799bca3ff4089ab81fde924b2d2f787cf0f Mon Sep 17 00:00:00 2001 From: TrainDoctor Date: Wed, 25 Oct 2023 19:47:33 -0700 Subject: Merge aa/type-cleanup-py (work by marios, aa, wolv) --- backend/src/browser.py | 275 ++++++++++++++++++++++++ backend/src/customtypes.py | 6 + backend/src/helpers.py | 153 +++++++++++++ backend/src/injector.py | 438 ++++++++++++++++++++++++++++++++++++++ backend/src/legacy/library.js | 84 ++++++++ backend/src/loader.py | 238 +++++++++++++++++++++ backend/src/localplatform.py | 52 +++++ backend/src/localplatformlinux.py | 192 +++++++++++++++++ backend/src/localplatformwin.py | 53 +++++ backend/src/localsocket.py | 139 ++++++++++++ backend/src/main.py | 192 +++++++++++++++++ backend/src/plugin.py | 163 ++++++++++++++ backend/src/settings.py | 60 ++++++ backend/src/updater.py | 238 +++++++++++++++++++++ backend/src/utilities.py | 373 ++++++++++++++++++++++++++++++++ 15 files changed, 2656 insertions(+) create mode 100644 backend/src/browser.py create mode 100644 backend/src/customtypes.py create mode 100644 backend/src/helpers.py create mode 100644 backend/src/injector.py create mode 100644 backend/src/legacy/library.js create mode 100644 backend/src/loader.py create mode 100644 backend/src/localplatform.py create mode 100644 backend/src/localplatformlinux.py create mode 100644 backend/src/localplatformwin.py create mode 100644 backend/src/localsocket.py create mode 100644 backend/src/main.py create mode 100644 backend/src/plugin.py create mode 100644 backend/src/settings.py create mode 100644 backend/src/updater.py create mode 100644 backend/src/utilities.py (limited to 'backend/src') diff --git a/backend/src/browser.py b/backend/src/browser.py new file mode 100644 index 00000000..da8569be --- /dev/null +++ b/backend/src/browser.py @@ -0,0 +1,275 @@ +# Full imports +import json +# import pprint +# from pprint import pformat + +# Partial imports +from aiohttp import ClientSession +from asyncio import sleep +from hashlib import sha256 +from io import BytesIO +from logging import getLogger +from os import R_OK, W_OK, path, listdir, access, mkdir +from shutil import rmtree +from time import time +from zipfile import ZipFile +from enum import IntEnum +from typing import Dict, List, TypedDict + +# Local modules +from .localplatform import chown, chmod +from .loader import Loader, Plugins +from .helpers import get_ssl_context, download_remote_binary_to_path +from .settings import SettingsManager +from .injector import get_gamepadui_tab + +logger = getLogger("Browser") + +class PluginInstallType(IntEnum): + INSTALL = 0 + REINSTALL = 1 + UPDATE = 2 + +class PluginInstallRequest(TypedDict): + name: str + artifact: str + version: str + hash: str + install_type: PluginInstallType + +class PluginInstallContext: + def __init__(self, artifact: str, name: str, version: str, hash: str) -> None: + self.artifact = artifact + self.name = name + self.version = version + self.hash = hash + +class PluginBrowser: + def __init__(self, plugin_path: str, plugins: Plugins, loader: Loader, settings: SettingsManager) -> None: + self.plugin_path = plugin_path + self.plugins = plugins + self.loader = loader + self.settings = settings + self.install_requests: Dict[str, PluginInstallContext | List[PluginInstallContext]] = {} + + def _unzip_to_plugin_dir(self, zip: BytesIO, name: str, hash: str): + zip_hash = sha256(zip.getbuffer()).hexdigest() + if hash and (zip_hash != hash): + return False + zip_file = ZipFile(zip) + zip_file.extractall(self.plugin_path) + plugin_folder = self.find_plugin_folder(name) + assert plugin_folder is not None + plugin_dir = path.join(self.plugin_path, plugin_folder) + + if not chown(plugin_dir) or not chmod(plugin_dir, 555): + logger.error(f"chown/chmod exited with a non-zero exit code") + return False + return True + + async def _download_remote_binaries_for_plugin_with_name(self, pluginBasePath: str): + rv = False + try: + packageJsonPath = path.join(pluginBasePath, 'package.json') + pluginBinPath = path.join(pluginBasePath, 'bin') + + if access(packageJsonPath, R_OK): + with open(packageJsonPath, "r", encoding="utf-8") as f: + packageJson = json.load(f) + if "remote_binary" in packageJson and len(packageJson["remote_binary"]) > 0: + # create bin directory if needed. + chmod(pluginBasePath, 777) + if access(pluginBasePath, W_OK): + if not path.exists(pluginBinPath): + mkdir(pluginBinPath) + if not access(pluginBinPath, W_OK): + chmod(pluginBinPath, 777) + + rv = True + for remoteBinary in packageJson["remote_binary"]: + # Required Fields. If any Remote Binary is missing these fail the install. + binName = remoteBinary["name"] + binURL = remoteBinary["url"] + binHash = remoteBinary["sha256hash"] + if not await download_remote_binary_to_path(binURL, binHash, path.join(pluginBinPath, binName)): + rv = False + raise Exception(f"Error Downloading Remote Binary {binName}@{binURL} with hash {binHash} to {path.join(pluginBinPath, binName)}") + + chown(self.plugin_path) + chmod(pluginBasePath, 555) + else: + rv = True + logger.debug(f"No Remote Binaries to Download") + + except Exception as e: + rv = False + logger.debug(str(e)) + + return rv + + """Return the filename (only) for the specified plugin""" + def find_plugin_folder(self, name: str) -> str | None: + for folder in listdir(self.plugin_path): + try: + with open(path.join(self.plugin_path, folder, 'plugin.json'), "r", encoding="utf-8") as f: + plugin = json.load(f) + + if plugin['name'] == name: + return folder + except: + logger.debug(f"skipping {folder}") + + async def uninstall_plugin(self, name: str): + if self.loader.watcher: + self.loader.watcher.disabled = True + tab = await get_gamepadui_tab() + plugin_folder = self.find_plugin_folder(name) + assert plugin_folder is not None + plugin_dir = path.join(self.plugin_path, plugin_folder) + try: + logger.info("uninstalling " + name) + logger.info(" at dir " + plugin_dir) + logger.debug("calling frontend unload for %s" % str(name)) + res = await tab.evaluate_js(f"DeckyPluginLoader.unloadPlugin('{name}')") + logger.debug("result of unload from UI: %s", res) + # plugins_snapshot = self.plugins.copy() + # snapshot_string = pformat(plugins_snapshot) + # logger.debug("current plugins: %s", snapshot_string) + if name in self.plugins: + logger.debug("Plugin %s was found", name) + self.plugins[name].stop() + logger.debug("Plugin %s was stopped", name) + del self.plugins[name] + logger.debug("Plugin %s was removed from the dictionary", name) + self.cleanup_plugin_settings(name) + logger.debug("removing files %s" % str(name)) + rmtree(plugin_dir) + except FileNotFoundError: + logger.warning(f"Plugin {name} not installed, skipping uninstallation") + except Exception as e: + logger.error(f"Plugin {name} in {plugin_dir} was not uninstalled") + logger.error(f"Error at {str(e)}", exc_info=e) + if self.loader.watcher: + self.loader.watcher.disabled = False + + async def _install(self, artifact: str, name: str, version: str, hash: str): + # Will be set later in code + res_zip = None + + # Check if plugin is installed + isInstalled = False + # Preserve plugin order before removing plugin (uninstall alters the order and removes the plugin from the list) + current_plugin_order = self.settings.getSetting("pluginOrder")[:] + if self.loader.watcher: + self.loader.watcher.disabled = True + try: + pluginFolderPath = self.find_plugin_folder(name) + if pluginFolderPath: + isInstalled = True + except: + logger.error(f"Failed to determine if {name} is already installed, continuing anyway.") + + # Check if the file is a local file or a URL + if artifact.startswith("file://"): + logger.info(f"Installing {name} from local ZIP file (Version: {version})") + res_zip = BytesIO(open(artifact[7:], "rb").read()) + else: + logger.info(f"Installing {name} from URL (Version: {version})") + async with ClientSession() as client: + logger.debug(f"Fetching {artifact}") + res = await client.get(artifact, ssl=get_ssl_context()) + if res.status == 200: + logger.debug("Got 200. Reading...") + data = await res.read() + logger.debug(f"Read {len(data)} bytes") + res_zip = BytesIO(data) + else: + logger.fatal(f"Could not fetch from URL. {await res.text()}") + + # Check to make sure we got the file + if res_zip is None: + logger.fatal(f"Could not fetch {artifact}") + return + + # If plugin is installed, uninstall it + if isInstalled: + try: + logger.debug("Uninstalling existing plugin...") + await self.uninstall_plugin(name) + except: + logger.error(f"Plugin {name} could not be uninstalled.") + + # Install the plugin + logger.debug("Unzipping...") + ret = self._unzip_to_plugin_dir(res_zip, name, hash) + if ret: + plugin_folder = self.find_plugin_folder(name) + assert plugin_folder is not None + plugin_dir = path.join(self.plugin_path, plugin_folder) + ret = await self._download_remote_binaries_for_plugin_with_name(plugin_dir) + if ret: + logger.info(f"Installed {name} (Version: {version})") + if name in self.loader.plugins: + self.loader.plugins[name].stop() + self.loader.plugins.pop(name, None) + await sleep(1) + if not isInstalled: + current_plugin_order = self.settings.getSetting("pluginOrder") + current_plugin_order.append(name) + self.settings.setSetting("pluginOrder", current_plugin_order) + logger.debug("Plugin %s was added to the pluginOrder setting", name) + self.loader.import_plugin(path.join(plugin_dir, "main.py"), plugin_folder) + else: + logger.fatal(f"Failed Downloading Remote Binaries") + else: + logger.fatal(f"SHA-256 Mismatch!!!! {name} (Version: {version})") + if self.loader.watcher: + self.loader.watcher.disabled = False + + async def request_plugin_install(self, artifact: str, name: str, version: str, hash: str, install_type: PluginInstallType): + request_id = str(time()) + self.install_requests[request_id] = PluginInstallContext(artifact, name, version, hash) + tab = await get_gamepadui_tab() + await tab.open_websocket() + await tab.evaluate_js(f"DeckyPluginLoader.addPluginInstallPrompt('{name}', '{version}', '{request_id}', '{hash}', {install_type})") + + async def request_multiple_plugin_installs(self, requests: List[PluginInstallRequest]): + request_id = str(time()) + self.install_requests[request_id] = [PluginInstallContext(req['artifact'], req['name'], req['version'], req['hash']) for req in requests] + js_requests_parameter = ','.join([ + f"{{ name: '{req['name']}', version: '{req['version']}', hash: '{req['hash']}', install_type: {req['install_type']}}}" for req in requests + ]) + + tab = await get_gamepadui_tab() + await tab.open_websocket() + await tab.evaluate_js(f"DeckyPluginLoader.addMultiplePluginsInstallPrompt('{request_id}', [{js_requests_parameter}])") + + async def confirm_plugin_install(self, request_id: str): + requestOrRequests = self.install_requests.pop(request_id) + if isinstance(requestOrRequests, list): + [await self._install(req.artifact, req.name, req.version, req.hash) for req in requestOrRequests] + else: + await self._install(requestOrRequests.artifact, requestOrRequests.name, requestOrRequests.version, requestOrRequests.hash) + + def cancel_plugin_install(self, request_id: str): + self.install_requests.pop(request_id) + + def cleanup_plugin_settings(self, name: str): + """Removes any settings related to a plugin. Propably called when a plugin is uninstalled. + + Args: + name (string): The name of the plugin + """ + hidden_plugins = self.settings.getSetting("hiddenPlugins", []) + if name in hidden_plugins: + hidden_plugins.remove(name) + self.settings.setSetting("hiddenPlugins", hidden_plugins) + + + plugin_order = self.settings.getSetting("pluginOrder", []) + + if name in plugin_order: + plugin_order.remove(name) + self.settings.setSetting("pluginOrder", plugin_order) + + logger.debug("Removed any settings for plugin %s", name) diff --git a/backend/src/customtypes.py b/backend/src/customtypes.py new file mode 100644 index 00000000..84ebc235 --- /dev/null +++ b/backend/src/customtypes.py @@ -0,0 +1,6 @@ +from enum import Enum + +class UserType(Enum): + HOST_USER = 1 + EFFECTIVE_USER = 2 + ROOT = 3 \ No newline at end of file diff --git a/backend/src/helpers.py b/backend/src/helpers.py new file mode 100644 index 00000000..f8796bd8 --- /dev/null +++ b/backend/src/helpers.py @@ -0,0 +1,153 @@ +import re +import ssl +import uuid +import os +import subprocess +from hashlib import sha256 +from io import BytesIO + +import certifi +from aiohttp.web import Request, Response, middleware +from aiohttp.typedefs import Handler +from aiohttp import ClientSession +from . import localplatform +from .customtypes import UserType +from logging import getLogger + +REMOTE_DEBUGGER_UNIT = "steam-web-debug-portforward.service" + +# global vars +csrf_token = str(uuid.uuid4()) +ssl_ctx = ssl.create_default_context(cafile=certifi.where()) + +assets_regex = re.compile("^/plugins/.*/assets/.*") +frontend_regex = re.compile("^/frontend/.*") +logger = getLogger("Main") + +def get_ssl_context(): + return ssl_ctx + +def get_csrf_token(): + return csrf_token + +@middleware +async def csrf_middleware(request: Request, handler: Handler): + if str(request.method) == "OPTIONS" or request.headers.get('Authentication') == csrf_token or str(request.rel_url) == "/auth/token" or str(request.rel_url).startswith("/plugins/load_main/") or str(request.rel_url).startswith("/static/") or str(request.rel_url).startswith("/legacy/") or str(request.rel_url).startswith("/steam_resource/") or str(request.rel_url).startswith("/frontend/") or assets_regex.match(str(request.rel_url)) or frontend_regex.match(str(request.rel_url)): + return await handler(request) + return Response(text='Forbidden', status=403) + +# Get the default homebrew path unless a home_path is specified. home_path argument is deprecated +def get_homebrew_path() -> str: + return localplatform.get_unprivileged_path() + +# Recursively create path and chown as user +def mkdir_as_user(path: str): + path = os.path.realpath(path) + os.makedirs(path, exist_ok=True) + localplatform.chown(path) + +# Fetches the version of loader +def get_loader_version() -> str: + try: + with open(os.path.join(os.getcwd(), ".loader.version"), "r", encoding="utf-8") as version_file: + return version_file.readline().strip() + except Exception as e: + logger.warn(f"Failed to execute get_loader_version(): {str(e)}") + return "unknown" + +# returns the appropriate system python paths +def get_system_pythonpaths() -> list[str]: + try: + # run as normal normal user if on linux to also include user python paths + proc = subprocess.run(["python3" if localplatform.ON_LINUX else "python", "-c", "import sys; print('\\n'.join(x for x in sys.path if x))"], + # TODO make this less insane + capture_output=True, user=localplatform.localplatform._get_user_id() if localplatform.ON_LINUX else None, env={} if localplatform.ON_LINUX else None) # type: ignore + return [x.strip() for x in proc.stdout.decode().strip().split("\n")] + except Exception as e: + logger.warn(f"Failed to execute get_system_pythonpaths(): {str(e)}") + return [] + +# Download Remote Binaries to local Plugin +async def download_remote_binary_to_path(url: str, binHash: str, path: str) -> bool: + rv = False + try: + if os.access(os.path.dirname(path), os.W_OK): + async with ClientSession() as client: + res = await client.get(url, ssl=get_ssl_context()) + if res.status == 200: + data = BytesIO(await res.read()) + remoteHash = sha256(data.getbuffer()).hexdigest() + if binHash == remoteHash: + data.seek(0) + with open(path, 'wb') as f: + f.write(data.getbuffer()) + rv = True + else: + raise Exception(f"Fatal Error: Hash Mismatch for remote binary {path}@{url}") + else: + rv = False + except: + rv = False + + return rv + +# Deprecated +def set_user(): + pass + +# Deprecated +def set_user_group() -> str: + return get_user_group() + +######### +# Below is legacy code, provided for backwards compatibility. This will break on windows +######### + +# Get the user id hosting the plugin loader +def get_user_id() -> int: + return localplatform.localplatform._get_user_id() # pyright: ignore [reportPrivateUsage] + +# Get the user hosting the plugin loader +def get_user() -> str: + return localplatform.localplatform._get_user() # pyright: ignore [reportPrivateUsage] + +# Get the effective user id of the running process +def get_effective_user_id() -> int: + return localplatform.localplatform._get_effective_user_id() # pyright: ignore [reportPrivateUsage] + +# Get the effective user of the running process +def get_effective_user() -> str: + return localplatform.localplatform._get_effective_user() # pyright: ignore [reportPrivateUsage] + +# Get the effective user group id of the running process +def get_effective_user_group_id() -> int: + return localplatform.localplatform._get_effective_user_group_id() # pyright: ignore [reportPrivateUsage] + +# Get the effective user group of the running process +def get_effective_user_group() -> str: + return localplatform.localplatform._get_effective_user_group() # pyright: ignore [reportPrivateUsage] + +# Get the user owner of the given file path. +def get_user_owner(file_path: str) -> str: + return localplatform.localplatform._get_user_owner(file_path) # pyright: ignore [reportPrivateUsage] + +# Get the user group of the given file path, or the user group hosting the plugin loader +def get_user_group(file_path: str | None = None) -> str: + return localplatform.localplatform._get_user_group(file_path) # pyright: ignore [reportPrivateUsage] + +# Get the group id of the user hosting the plugin loader +def get_user_group_id() -> int: + return localplatform.localplatform._get_user_group_id() # pyright: ignore [reportPrivateUsage] + +# Get the default home path unless a user is specified +def get_home_path(username: str | None = None) -> str: + return localplatform.get_home_path(UserType.ROOT if username == "root" else UserType.HOST_USER) + +async def is_systemd_unit_active(unit_name: str) -> bool: + return await localplatform.service_active(unit_name) + +async def stop_systemd_unit(unit_name: str) -> bool: + return await localplatform.service_stop(unit_name) + +async def start_systemd_unit(unit_name: str) -> bool: + return await localplatform.service_start(unit_name) diff --git a/backend/src/injector.py b/backend/src/injector.py new file mode 100644 index 00000000..a217f689 --- /dev/null +++ b/backend/src/injector.py @@ -0,0 +1,438 @@ +# Injector code from https://github.com/SteamDeckHomebrew/steamdeck-ui-inject. More info on how it works there. + +from asyncio import sleep +from logging import getLogger +from typing import Any, Callable, List, TypedDict, Dict + +from aiohttp import ClientSession +from aiohttp.client_exceptions import ClientConnectorError, ClientOSError +from asyncio.exceptions import TimeoutError +import uuid + +BASE_ADDRESS = "http://localhost:8080" + +logger = getLogger("Injector") + +class _TabResponse(TypedDict): + title: str + id: str + url: str + webSocketDebuggerUrl: str + +class Tab: + cmd_id = 0 + + def __init__(self, res: _TabResponse) -> None: + self.title: str = res["title"] + self.id: str = res["id"] + self.url: str = res["url"] + self.ws_url: str = res["webSocketDebuggerUrl"] + + self.websocket = None + self.client = None + + async def open_websocket(self): + self.client = ClientSession() + self.websocket = await self.client.ws_connect(self.ws_url) # type: ignore + + async def close_websocket(self): + if self.websocket: + await self.websocket.close() + if self.client: + await self.client.close() + + async def listen_for_message(self): + if self.websocket: + async for message in self.websocket: + data = message.json() + yield data + logger.warn(f"The Tab {self.title} socket has been disconnected while listening for messages.") + await self.close_websocket() + + async def _send_devtools_cmd(self, dc: Dict[str, Any], receive: bool = True): + if self.websocket: + self.cmd_id += 1 + dc["id"] = self.cmd_id + await self.websocket.send_json(dc) + if receive: + async for msg in self.listen_for_message(): + if "id" in msg and msg["id"] == dc["id"]: + return msg + return None + raise RuntimeError("Websocket not opened") + + async def evaluate_js(self, js: str, run_async: bool | None = False, manage_socket: bool | None = True, get_result: bool = True): + try: + if manage_socket: + await self.open_websocket() + + res = await self._send_devtools_cmd({ + "method": "Runtime.evaluate", + "params": { + "expression": js, + "userGesture": True, + "awaitPromise": run_async + } + }, get_result) + + finally: + if manage_socket: + await self.close_websocket() + return res + + async def has_global_var(self, var_name: str, manage_socket: bool = True): + res = await self.evaluate_js(f"window['{var_name}'] !== null && window['{var_name}'] !== undefined", False, manage_socket) + assert res is not None + + if not "result" in res or not "result" in res["result"] or not "value" in res["result"]["result"]: + return False + + return res["result"]["result"]["value"] + + async def close(self, manage_socket: bool = True): + try: + if manage_socket: + await self.open_websocket() + + res = await self._send_devtools_cmd({ + "method": "Page.close", + }, False) + + finally: + if manage_socket: + await self.close_websocket() + return res + + async def enable(self): + """ + Enables page domain notifications. + """ + await self._send_devtools_cmd({ + "method": "Page.enable", + }, False) + + async def disable(self): + """ + Disables page domain notifications. + """ + await self._send_devtools_cmd({ + "method": "Page.disable", + }, False) + + async def refresh(self, manage_socket: bool = True): + try: + if manage_socket: + await self.open_websocket() + + await self._send_devtools_cmd({ + "method": "Page.reload", + }, False) + + finally: + if manage_socket: + await self.close_websocket() + + return + async def reload_and_evaluate(self, js: str, manage_socket: bool = True): + """ + Reloads the current tab, with JS to run on load via debugger + """ + try: + if manage_socket: + await self.open_websocket() + + await self._send_devtools_cmd({ + "method": "Debugger.enable" + }, True) + + await self._send_devtools_cmd({ + "method": "Runtime.evaluate", + "params": { + "expression": "location.reload();", + "userGesture": True, + "awaitPromise": False + } + }, False) + + breakpoint_res = await self._send_devtools_cmd({ + "method": "Debugger.setInstrumentationBreakpoint", + "params": { + "instrumentation": "beforeScriptExecution" + } + }, True) + + assert breakpoint_res is not None + + logger.info(breakpoint_res) + + # Page finishes loading when breakpoint hits + + for _ in range(20): + # this works around 1/5 of the time, so just send it 8 times. + # the js accounts for being injected multiple times allowing only one instance to run at a time anyway + await self._send_devtools_cmd({ + "method": "Runtime.evaluate", + "params": { + "expression": js, + "userGesture": True, + "awaitPromise": False + } + }, False) + + await self._send_devtools_cmd({ + "method": "Debugger.removeBreakpoint", + "params": { + "breakpointId": breakpoint_res["result"]["breakpointId"] + } + }, False) + + for _ in range(4): + await self._send_devtools_cmd({ + "method": "Debugger.resume" + }, False) + + await self._send_devtools_cmd({ + "method": "Debugger.disable" + }, True) + + finally: + if manage_socket: + await self.close_websocket() + return + + async def add_script_to_evaluate_on_new_document(self, js: str, add_dom_wrapper: bool = True, manage_socket: bool = True, get_result: bool = True): + """ + How the underlying call functions is not particularly clear from the devtools docs, so stealing puppeteer's description: + + Adds a function which would be invoked in one of the following scenarios: + * whenever the page is navigated + * whenever the child frame is attached or navigated. In this case, the + function is invoked in the context of the newly attached frame. + + The function is invoked after the document was created but before any of + its scripts were run. This is useful to amend the JavaScript environment, + e.g. to seed `Math.random`. + + Parameters + ---------- + js : str + The script to evaluate on new document + add_dom_wrapper : bool + True to wrap the script in a wait for the 'DOMContentLoaded' event. + DOM will usually not exist when this execution happens, + so it is necessary to delay til DOM is loaded if you are modifying it + manage_socket : bool + True to have this function handle opening/closing the websocket for this tab + get_result : bool + True to wait for the result of this call + + Returns + ------- + int or None + The identifier of the script added, used to remove it later. + (see remove_script_to_evaluate_on_new_document below) + None is returned if `get_result` is False + """ + try: + + wrappedjs = """ + function scriptFunc() { + {js} + } + if (document.readyState === 'loading') { + addEventListener('DOMContentLoaded', () => { + scriptFunc(); + }); + } else { + scriptFunc(); + } + """.format(js=js) if add_dom_wrapper else js + + if manage_socket: + await self.open_websocket() + + res = await self._send_devtools_cmd({ + "method": "Page.addScriptToEvaluateOnNewDocument", + "params": { + "source": wrappedjs + } + }, get_result) + + finally: + if manage_socket: + await self.close_websocket() + return res + + async def remove_script_to_evaluate_on_new_document(self, script_id: str, manage_socket: bool = True): + """ + Removes a script from a page that was added with `add_script_to_evaluate_on_new_document` + + Parameters + ---------- + script_id : int + The identifier of the script to remove (returned from `add_script_to_evaluate_on_new_document`) + """ + + try: + if manage_socket: + await self.open_websocket() + + await self._send_devtools_cmd({ + "method": "Page.removeScriptToEvaluateOnNewDocument", + "params": { + "identifier": script_id + } + }, False) + + finally: + if manage_socket: + await self.close_websocket() + + async def has_element(self, element_name: str, manage_socket: bool = True): + res = await self.evaluate_js(f"document.getElementById('{element_name}') != null", False, manage_socket) + assert res is not None + + if not "result" in res or not "result" in res["result"] or not "value" in res["result"]["result"]: + return False + + return res["result"]["result"]["value"] + + async def inject_css(self, style: str, manage_socket: bool = True): + try: + css_id = str(uuid.uuid4()) + + result = await self.evaluate_js( + f""" + (function() {{ + const style = document.createElement('style'); + style.id = "{css_id}"; + document.head.append(style); + style.textContent = `{style}`; + }})() + """, False, manage_socket) + + assert result is not None + + if "exceptionDetails" in result["result"]: + return { + "success": False, + "result": result["result"] + } + + return { + "success": True, + "result": css_id + } + except Exception as e: + return { + "success": False, + "result": e + } + + async def remove_css(self, css_id: str, manage_socket: bool = True): + try: + result = await self.evaluate_js( + f""" + (function() {{ + let style = document.getElementById("{css_id}"); + + if (style.nodeName.toLowerCase() == 'style') + style.parentNode.removeChild(style); + }})() + """, False, manage_socket) + + assert result is not None + + if "exceptionDetails" in result["result"]: + return { + "success": False, + "result": result + } + + return { + "success": True + } + except Exception as e: + return { + "success": False, + "result": e + } + + async def get_steam_resource(self, url: str): + res = await self.evaluate_js(f'(async function test() {{ return await (await fetch("{url}")).text() }})()', True) + assert res is not None + return res["result"]["result"]["value"] + + def __repr__(self): + return self.title + + +async def get_tabs() -> List[Tab]: + res = {} + + na = False + while True: + try: + async with ClientSession() as web: + res = await web.get(f"{BASE_ADDRESS}/json", timeout=3) + except ClientConnectorError: + if not na: + logger.debug("Steam isn't available yet. Wait for a moment...") + na = True + await sleep(5) + except ClientOSError: + logger.warn(f"The request to {BASE_ADDRESS}/json was reset") + await sleep(1) + except TimeoutError: + logger.warn(f"The request to {BASE_ADDRESS}/json timed out") + await sleep(1) + else: + break + + if res.status == 200: + r = await res.json() + return [Tab(i) for i in r] + else: + raise Exception(f"/json did not return 200. {await res.text()}") + + +async def get_tab(tab_name: str) -> Tab: + tabs = await get_tabs() + tab = next((i for i in tabs if i.title == tab_name), None) + if not tab: + raise ValueError(f"Tab {tab_name} not found") + return tab + +async def get_tab_lambda(test: Callable[[Tab], bool]) -> Tab: + tabs = await get_tabs() + tab = next((i for i in tabs if test(i)), None) + if not tab: + raise ValueError(f"Tab not found by lambda") + return tab + +SHARED_CTX_NAMES = ["SharedJSContext", "Steam Shared Context presented by Valveā„¢", "Steam", "SP"] +CLOSEABLE_URLS = ["about:blank", "data:text/html,%3Cbody%3E%3C%2Fbody%3E"] # Closing anything other than these *really* likes to crash Steam +DO_NOT_CLOSE_URL = "Valve Steam Gamepad/default" # Steam Big Picture Mode tab + +def tab_is_gamepadui(t: Tab) -> bool: + return "https://steamloopback.host/routes/" in t.url and t.title in SHARED_CTX_NAMES + +async def get_gamepadui_tab() -> Tab: + tabs = await get_tabs() + tab = next((i for i in tabs if tab_is_gamepadui(i)), None) + if not tab: + raise ValueError(f"GamepadUI Tab not found") + return tab + +async def inject_to_tab(tab_name: str, js: str, run_async: bool = False): + tab = await get_tab(tab_name) + + return await tab.evaluate_js(js, run_async) + +async def close_old_tabs(): + tabs = await get_tabs() + for t in tabs: + if not t.title or (t.title not in SHARED_CTX_NAMES and any(url in t.url for url in CLOSEABLE_URLS) and DO_NOT_CLOSE_URL not in t.url): + logger.debug("Closing tab: " + getattr(t, "title", "Untitled")) + await t.close() + await sleep(0.5) diff --git a/backend/src/legacy/library.js b/backend/src/legacy/library.js new file mode 100644 index 00000000..17f4e46f --- /dev/null +++ b/backend/src/legacy/library.js @@ -0,0 +1,84 @@ +class PluginEventTarget extends EventTarget { } +method_call_ev_target = new PluginEventTarget(); + +window.addEventListener("message", function(evt) { + let ev = new Event(evt.data.call_id); + ev.data = evt.data.result; + method_call_ev_target.dispatchEvent(ev); +}, false); + +async function call_server_method(method_name, arg_object={}) { + const token = await fetch("http://127.0.0.1:1337/auth/token").then(r => r.text()); + const response = await fetch(`http://127.0.0.1:1337/methods/${method_name}`, { + method: 'POST', + credentials: "include", + headers: { + 'Content-Type': 'application/json', + Authentication: token + }, + body: JSON.stringify(arg_object), + }); + + const dta = await response.json(); + if (!dta.success) throw dta.result; + return dta.result; +} + +// Source: https://stackoverflow.com/a/2117523 Thanks! +function uuidv4() { + return ([1e7]+-1e3+-4e3+-8e3+-1e11).replace(/[018]/g, c => + (c ^ crypto.getRandomValues(new Uint8Array(1))[0] & 15 >> c / 4).toString(16) + ); +} + +async function fetch_nocors(url, request={}) { + let args = { method: "POST", headers: {}, body: "" }; + request = {...args, ...request}; + request.url = url; + request.data = request.body; + delete request.body; //maintain api-compatibility with fetch + return await call_server_method("http_request", request); +} + +async function call_plugin_method(method_name, arg_object={}) { + if (plugin_name == undefined) + throw new Error("Plugin methods can only be called from inside plugins (duh)"); + const token = await fetch("http://127.0.0.1:1337/auth/token").then(r => r.text()); + const response = await fetch(`http://127.0.0.1:1337/plugins/${plugin_name}/methods/${method_name}`, { + method: 'POST', + credentials: "include", + headers: { + 'Content-Type': 'application/json', + Authentication: token + }, + body: JSON.stringify({ + args: arg_object, + }), + }); + + const dta = await response.json(); + if (!dta.success) throw dta.result; + return dta.result; +} + +async function execute_in_tab(tab, run_async, code) { + return await call_server_method("execute_in_tab", { + 'tab': tab, + 'run_async': run_async, + 'code': code + }); +} + +async function inject_css_into_tab(tab, style) { + return await call_server_method("inject_css_into_tab", { + 'tab': tab, + 'style': style + }); +} + +async function remove_css_from_tab(tab, css_id) { + return await call_server_method("remove_css_from_tab", { + 'tab': tab, + 'css_id': css_id + }); +} \ No newline at end of file diff --git a/backend/src/loader.py b/backend/src/loader.py new file mode 100644 index 00000000..e59cbcaf --- /dev/null +++ b/backend/src/loader.py @@ -0,0 +1,238 @@ +from __future__ import annotations +from asyncio import AbstractEventLoop, Queue, sleep +from json.decoder import JSONDecodeError +from logging import getLogger +from os import listdir, path +from pathlib import Path +from traceback import print_exc +from typing import Any, Tuple + +from aiohttp import web +from os.path import exists +from watchdog.events import RegexMatchingEventHandler, DirCreatedEvent, DirModifiedEvent, FileCreatedEvent, FileModifiedEvent # type: ignore +from watchdog.observers import Observer # type: ignore + +from typing import TYPE_CHECKING +if TYPE_CHECKING: + from .main import PluginManager + +from .injector import get_tab, get_gamepadui_tab +from .plugin import PluginWrapper + +Plugins = dict[str, PluginWrapper] +ReloadQueue = Queue[Tuple[str, str, bool | None] | Tuple[str, str]] + +class FileChangeHandler(RegexMatchingEventHandler): + def __init__(self, queue: ReloadQueue, plugin_path: str) -> None: + super().__init__(regexes=[r'^.*?dist\/index\.js$', r'^.*?main\.py$']) # type: ignore + self.logger = getLogger("file-watcher") + self.plugin_path = plugin_path + self.queue = queue + self.disabled = True + + def maybe_reload(self, src_path: str): + if self.disabled: + return + plugin_dir = Path(path.relpath(src_path, self.plugin_path)).parts[0] + if exists(path.join(self.plugin_path, plugin_dir, "plugin.json")): + self.queue.put_nowait((path.join(self.plugin_path, plugin_dir, "main.py"), plugin_dir, True)) + + def on_created(self, event: DirCreatedEvent | FileCreatedEvent): + src_path = event.src_path + if "__pycache__" in src_path: + return + + # check to make sure this isn't a directory + if path.isdir(src_path): + return + + # get the directory name of the plugin so that we can find its "main.py" and reload it; the + # file that changed is not necessarily the one that needs to be reloaded + self.logger.debug(f"file created: {src_path}") + self.maybe_reload(src_path) + + def on_modified(self, event: DirModifiedEvent | FileModifiedEvent): + src_path = event.src_path + if "__pycache__" in src_path: + return + + # check to make sure this isn't a directory + if path.isdir(src_path): + return + + # get the directory name of the plugin so that we can find its "main.py" and reload it; the + # file that changed is not necessarily the one that needs to be reloaded + self.logger.debug(f"file modified: {src_path}") + self.maybe_reload(src_path) + +class Loader: + def __init__(self, server_instance: PluginManager, plugin_path: str, loop: AbstractEventLoop, live_reload: bool = False) -> None: + self.loop = loop + self.logger = getLogger("Loader") + self.plugin_path = plugin_path + self.logger.info(f"plugin_path: {self.plugin_path}") + self.plugins: Plugins = {} + self.watcher = None + self.live_reload = live_reload + self.reload_queue: ReloadQueue = Queue() + self.loop.create_task(self.handle_reloads()) + + if live_reload: + self.observer = Observer() + self.watcher = FileChangeHandler(self.reload_queue, plugin_path) + self.observer.schedule(self.watcher, self.plugin_path, recursive=True) # type: ignore + self.observer.start() + self.loop.create_task(self.enable_reload_wait()) + + server_instance.web_app.add_routes([ + web.get("/frontend/{path:.*}", self.handle_frontend_assets), + web.get("/locales/{path:.*}", self.handle_frontend_locales), + web.get("/plugins", self.get_plugins), + web.get("/plugins/{plugin_name}/frontend_bundle", self.handle_frontend_bundle), + web.post("/plugins/{plugin_name}/methods/{method_name}", self.handle_plugin_method_call), + web.get("/plugins/{plugin_name}/assets/{path:.*}", self.handle_plugin_frontend_assets), + web.post("/plugins/{plugin_name}/reload", self.handle_backend_reload_request), + + # The following is legacy plugin code. + web.get("/plugins/load_main/{name}", self.load_plugin_main_view), + web.get("/plugins/plugin_resource/{name}/{path:.+}", self.handle_sub_route), + web.get("/steam_resource/{path:.+}", self.get_steam_resource) + ]) + + async def enable_reload_wait(self): + if self.live_reload: + await sleep(10) + if self.watcher: + self.logger.info("Hot reload enabled") + self.watcher.disabled = False + + async def handle_frontend_assets(self, request: web.Request): + file = path.join(path.dirname(__file__), "..", "static", request.match_info["path"]) + + return web.FileResponse(file, headers={"Cache-Control": "no-cache"}) + + async def handle_frontend_locales(self, request: web.Request): + req_lang = request.match_info["path"] + file = path.join(path.dirname(__file__), "..", "locales", req_lang) + if exists(file): + return web.FileResponse(file, headers={"Cache-Control": "no-cache", "Content-Type": "application/json"}) + else: + self.logger.info(f"Language {req_lang} not available, returning an empty dictionary") + return web.json_response(data={}, headers={"Cache-Control": "no-cache"}) + + async def get_plugins(self, request: web.Request): + plugins = list(self.plugins.values()) + return web.json_response([{"name": str(i) if not i.legacy else "$LEGACY_"+str(i), "version": i.version} for i in plugins]) + + async def handle_plugin_frontend_assets(self, request: web.Request): + plugin = self.plugins[request.match_info["plugin_name"]] + file = path.join(self.plugin_path, plugin.plugin_directory, "dist/assets", request.match_info["path"]) + + return web.FileResponse(file, headers={"Cache-Control": "no-cache"}) + + async def handle_frontend_bundle(self, request: web.Request): + plugin = self.plugins[request.match_info["plugin_name"]] + + with open(path.join(self.plugin_path, plugin.plugin_directory, "dist/index.js"), "r", encoding="utf-8") as bundle: + return web.Response(text=bundle.read(), content_type="application/javascript") + + def import_plugin(self, file: str, plugin_directory: str, refresh: bool | None = False, batch: bool | None = False): + try: + plugin = PluginWrapper(file, plugin_directory, self.plugin_path) + if plugin.name in self.plugins: + if not "debug" in plugin.flags and refresh: + self.logger.info(f"Plugin {plugin.name} is already loaded and has requested to not be re-loaded") + return + else: + self.plugins[plugin.name].stop() + self.plugins.pop(plugin.name, None) + if plugin.passive: + self.logger.info(f"Plugin {plugin.name} is passive") + self.plugins[plugin.name] = plugin.start() + self.logger.info(f"Loaded {plugin.name}") + if not batch: + self.loop.create_task(self.dispatch_plugin(plugin.name if not plugin.legacy else "$LEGACY_" + plugin.name, plugin.version)) + except Exception as e: + self.logger.error(f"Could not load {file}. {e}") + print_exc() + + async def dispatch_plugin(self, name: str, version: str | None): + gpui_tab = await get_gamepadui_tab() + await gpui_tab.evaluate_js(f"window.importDeckyPlugin('{name}', '{version}')") + + def import_plugins(self): + self.logger.info(f"import plugins from {self.plugin_path}") + + directories = [i for i in listdir(self.plugin_path) if path.isdir(path.join(self.plugin_path, i)) and path.isfile(path.join(self.plugin_path, i, "plugin.json"))] + for directory in directories: + self.logger.info(f"found plugin: {directory}") + self.import_plugin(path.join(self.plugin_path, directory, "main.py"), directory, False, True) + + async def handle_reloads(self): + while True: + args = await self.reload_queue.get() + self.import_plugin(*args) # type: ignore + + async def handle_plugin_method_call(self, request: web.Request): + res = {} + plugin = self.plugins[request.match_info["plugin_name"]] + method_name = request.match_info["method_name"] + try: + method_info = await request.json() + args: Any = method_info["args"] + except JSONDecodeError: + args = {} + try: + if method_name.startswith("_"): + raise RuntimeError("Tried to call private method") + res["result"] = await plugin.execute_method(method_name, args) + res["success"] = True + except Exception as e: + res["result"] = str(e) + res["success"] = False + return web.json_response(res) + + """ + The following methods are used to load legacy plugins, which are considered deprecated. + I made the choice to re-add them so that the first iteration/version of the react loader + can work as a drop-in replacement for the stable branch of the PluginLoader, so that we + can introduce it more smoothly and give people the chance to sample the new features even + without plugin support. They will be removed once legacy plugins are no longer relevant. + """ + async def load_plugin_main_view(self, request: web.Request): + plugin = self.plugins[request.match_info["name"]] + with open(path.join(self.plugin_path, plugin.plugin_directory, plugin.main_view_html), "r", encoding="utf-8") as template: + template_data = template.read() + ret = f""" + + + + {template_data} + """ + return web.Response(text=ret, content_type="text/html") + + async def handle_sub_route(self, request: web.Request): + plugin = self.plugins[request.match_info["name"]] + route_path = request.match_info["path"] + self.logger.info(path) + ret = "" + file_path = path.join(self.plugin_path, plugin.plugin_directory, route_path) + with open(file_path, "r", encoding="utf-8") as resource_data: + ret = resource_data.read() + + return web.Response(text=ret) + + async def get_steam_resource(self, request: web.Request): + tab = await get_tab("SP") + try: + return web.Response(text=await tab.get_steam_resource(f"https://steamloopback.host/{request.match_info['path']}"), content_type="text/html") + except Exception as e: + return web.Response(text=str(e), status=400) + + async def handle_backend_reload_request(self, request: web.Request): + plugin_name : str = request.match_info["plugin_name"] + plugin = self.plugins[plugin_name] + + await self.reload_queue.put((plugin.file, plugin.plugin_directory)) + + return web.Response(status=200) \ No newline at end of file diff --git a/backend/src/localplatform.py b/backend/src/localplatform.py new file mode 100644 index 00000000..028eff8f --- /dev/null +++ b/backend/src/localplatform.py @@ -0,0 +1,52 @@ +import platform, os + +ON_WINDOWS = platform.system() == "Windows" +ON_LINUX = not ON_WINDOWS + +if ON_WINDOWS: + from .localplatformwin import * + from . import localplatformwin as localplatform +else: + from .localplatformlinux import * + from . import localplatformlinux as localplatform + +def get_privileged_path() -> str: + '''Get path accessible by elevated user. Holds plugins, decky loader and decky loader configs''' + return localplatform.get_privileged_path() + +def get_unprivileged_path() -> str: + '''Get path accessible by non-elevated user. Holds plugin configuration, plugin data and plugin logs. Externally referred to as the 'Homebrew' directory''' + return localplatform.get_unprivileged_path() + +def get_unprivileged_user() -> str: + '''Get user that should own files made in unprivileged path''' + return localplatform.get_unprivileged_user() + +def get_chown_plugin_path() -> bool: + return os.getenv("CHOWN_PLUGIN_PATH", "1") == "1" + +def get_server_host() -> str: + return os.getenv("SERVER_HOST", "127.0.0.1") + +def get_server_port() -> int: + return int(os.getenv("SERVER_PORT", "1337")) + +def get_live_reload() -> bool: + return os.getenv("LIVE_RELOAD", "1") == "1" + +def get_keep_systemd_service() -> bool: + return os.getenv("KEEP_SYSTEMD_SERVICE", "0") == "1" + +def get_log_level() -> int: + return {"CRITICAL": 50, "ERROR": 40, "WARNING": 30, "INFO": 20, "DEBUG": 10}[ + os.getenv("LOG_LEVEL", "INFO") + ] + +def get_selinux() -> bool: + if ON_LINUX: + from subprocess import check_output + try: + if (check_output("getenforce").decode("ascii").strip("\n") == "Enforcing"): return True + except FileNotFoundError: + pass + return False diff --git a/backend/src/localplatformlinux.py b/backend/src/localplatformlinux.py new file mode 100644 index 00000000..bde2caac --- /dev/null +++ b/backend/src/localplatformlinux.py @@ -0,0 +1,192 @@ +import os, pwd, grp, sys, logging +from subprocess import call, run, DEVNULL, PIPE, STDOUT +from .customtypes import UserType + +logger = logging.getLogger("localplatform") + +# Get the user id hosting the plugin loader +def _get_user_id() -> int: + return pwd.getpwnam(_get_user()).pw_uid + +# Get the user hosting the plugin loader +def _get_user() -> str: + return get_unprivileged_user() + +# Get the effective user id of the running process +def _get_effective_user_id() -> int: + return os.geteuid() + +# Get the effective user of the running process +def _get_effective_user() -> str: + return pwd.getpwuid(_get_effective_user_id()).pw_name + +# Get the effective user group id of the running process +def _get_effective_user_group_id() -> int: + return os.getegid() + +# Get the effective user group of the running process +def _get_effective_user_group() -> str: + return grp.getgrgid(_get_effective_user_group_id()).gr_name + +# Get the user owner of the given file path. +def _get_user_owner(file_path: str) -> str: + return pwd.getpwuid(os.stat(file_path).st_uid).pw_name + +# Get the user group of the given file path, or the user group hosting the plugin loader +def _get_user_group(file_path: str | None = None) -> str: + return grp.getgrgid(os.stat(file_path).st_gid if file_path is not None else _get_user_group_id()).gr_name + +# Get the group id of the user hosting the plugin loader +def _get_user_group_id() -> int: + return pwd.getpwuid(_get_user_id()).pw_gid + +def chown(path : str, user : UserType = UserType.HOST_USER, recursive : bool = True) -> bool: + user_str = "" + + if user == UserType.HOST_USER: + user_str = _get_user()+":"+_get_user_group() + elif user == UserType.EFFECTIVE_USER: + user_str = _get_effective_user()+":"+_get_effective_user_group() + elif user == UserType.ROOT: + user_str = "root:root" + else: + raise Exception("Unknown User Type") + + result = call(["chown", "-R", user_str, path] if recursive else ["chown", user_str, path]) + return result == 0 + +def chmod(path : str, permissions : int, recursive : bool = True) -> bool: + if _get_effective_user_id() != 0: + return True + result = call(["chmod", "-R", str(permissions), path] if recursive else ["chmod", str(permissions), path]) + return result == 0 + +def folder_owner(path : str) -> UserType|None: + user_owner = _get_user_owner(path) + + if (user_owner == _get_user()): + return UserType.HOST_USER + + elif (user_owner == _get_effective_user()): + return UserType.EFFECTIVE_USER + + else: + return None + +def get_home_path(user : UserType = UserType.HOST_USER) -> str: + user_name = "root" + + if user == UserType.HOST_USER: + user_name = _get_user() + elif user == UserType.EFFECTIVE_USER: + user_name = _get_effective_user() + elif user == UserType.ROOT: + pass + else: + raise Exception("Unknown User Type") + + return pwd.getpwnam(user_name).pw_dir + +def get_username() -> str: + return _get_user() + +def setgid(user : UserType = UserType.HOST_USER): + user_id = 0 + + if user == UserType.HOST_USER: + user_id = _get_user_group_id() + elif user == UserType.ROOT: + pass + else: + raise Exception("Unknown user type") + + os.setgid(user_id) + +def setuid(user : UserType = UserType.HOST_USER): + user_id = 0 + + if user == UserType.HOST_USER: + user_id = _get_user_id() + elif user == UserType.ROOT: + pass + else: + raise Exception("Unknown user type") + + os.setuid(user_id) + +async def service_active(service_name : str) -> bool: + res = run(["systemctl", "is-active", service_name], stdout=DEVNULL, stderr=DEVNULL) + return res.returncode == 0 + +async def service_restart(service_name : str) -> bool: + call(["systemctl", "daemon-reload"]) + cmd = ["systemctl", "restart", service_name] + res = run(cmd, stdout=PIPE, stderr=STDOUT) + return res.returncode == 0 + +async def service_stop(service_name : str) -> bool: + cmd = ["systemctl", "stop", service_name] + res = run(cmd, stdout=PIPE, stderr=STDOUT) + return res.returncode == 0 + +async def service_start(service_name : str) -> bool: + cmd = ["systemctl", "start", service_name] + res = run(cmd, stdout=PIPE, stderr=STDOUT) + return res.returncode == 0 + +def get_privileged_path() -> str: + path = os.getenv("PRIVILEGED_PATH") + + if path == None: + path = get_unprivileged_path() + + return path + +def _parent_dir(path : str | None) -> str | None: + if path == None: + return None + + if path.endswith('/'): + path = path[:-1] + + return os.path.dirname(path) + +def get_unprivileged_path() -> str: + path = os.getenv("UNPRIVILEGED_PATH") + + if path == None: + path = _parent_dir(os.getenv("PLUGIN_PATH")) + + if path == None: + logger.debug("Unprivileged path is not properly configured. Making something up!") + # Expected path of loader binary is /home/deck/homebrew/service/PluginLoader + path = _parent_dir(_parent_dir(os.path.realpath(sys.argv[0]))) + + if path != None and not os.path.exists(path): + path = None + + if path == None: + logger.warn("Unprivileged path is not properly configured. Defaulting to /home/deck/homebrew") + path = "/home/deck/homebrew" # We give up + + return path + + +def get_unprivileged_user() -> str: + user = os.getenv("UNPRIVILEGED_USER") + + if user == None: + # Lets hope we can extract it from the unprivileged dir + dir = os.path.realpath(get_unprivileged_path()) + + pws = sorted(pwd.getpwall(), reverse=True, key=lambda pw: len(pw.pw_dir)) + for pw in pws: + if dir.startswith(os.path.realpath(pw.pw_dir)): + user = pw.pw_name + break + + if user == None: + logger.warn("Unprivileged user is not properly configured. Defaulting to 'deck'") + user = 'deck' + + return user diff --git a/backend/src/localplatformwin.py b/backend/src/localplatformwin.py new file mode 100644 index 00000000..4c4e9439 --- /dev/null +++ b/backend/src/localplatformwin.py @@ -0,0 +1,53 @@ +from .customtypes import UserType +import os, sys + +def chown(path : str, user : UserType = UserType.HOST_USER, recursive : bool = True) -> bool: + return True # Stubbed + +def chmod(path : str, permissions : int, recursive : bool = True) -> bool: + return True # Stubbed + +def folder_owner(path : str) -> UserType|None: + return UserType.HOST_USER # Stubbed + +def get_home_path(user : UserType = UserType.HOST_USER) -> str: + return os.path.expanduser("~") # Mostly stubbed + +def setgid(user : UserType = UserType.HOST_USER): + pass # Stubbed + +def setuid(user : UserType = UserType.HOST_USER): + pass # Stubbed + +async def service_active(service_name : str) -> bool: + return True # Stubbed + +async def service_stop(service_name : str) -> bool: + return True # Stubbed + +async def service_start(service_name : str) -> bool: + return True # Stubbed + +async def service_restart(service_name : str) -> bool: + if service_name == "plugin_loader": + sys.exit(42) + + return True # Stubbed + +def get_username() -> str: + return os.getlogin() + +def get_privileged_path() -> str: + '''On windows, privileged_path is equal to unprivileged_path''' + return get_unprivileged_path() + +def get_unprivileged_path() -> str: + path = os.getenv("UNPRIVILEGED_PATH") + + if path == None: + path = os.getenv("PRIVILEGED_PATH", os.path.join(os.path.expanduser("~"), "homebrew")) + + return path + +def get_unprivileged_user() -> str: + return os.getenv("UNPRIVILEGED_USER", os.getlogin()) diff --git a/backend/src/localsocket.py b/backend/src/localsocket.py new file mode 100644 index 00000000..f38fe5e7 --- /dev/null +++ b/backend/src/localsocket.py @@ -0,0 +1,139 @@ +import asyncio, time +from typing import Awaitable, Callable +import random + +from .localplatform import ON_WINDOWS + +BUFFER_LIMIT = 2 ** 20 # 1 MiB + +class UnixSocket: + def __init__(self, on_new_message: Callable[[str], Awaitable[str|None]]): + ''' + on_new_message takes 1 string argument. + It's return value gets used, if not None, to write data to the socket. + Method should be async + ''' + self.socket_addr = f"/tmp/plugin_socket_{time.time()}" + self.on_new_message = on_new_message + self.socket = None + self.reader = None + self.writer = None + + async def setup_server(self): + self.socket = await asyncio.start_unix_server(self._listen_for_method_call, path=self.socket_addr, limit=BUFFER_LIMIT) + + async def _open_socket_if_not_exists(self): + if not self.reader: + retries = 0 + while retries < 10: + try: + self.reader, self.writer = await asyncio.open_unix_connection(self.socket_addr, limit=BUFFER_LIMIT) + return True + except: + await asyncio.sleep(2) + retries += 1 + return False + else: + return True + + async def get_socket_connection(self): + if not await self._open_socket_if_not_exists(): + return None, None + + return self.reader, self.writer + + async def close_socket_connection(self): + if self.writer != None: + self.writer.close() + + self.reader = None + + async def read_single_line(self) -> str|None: + reader, _ = await self.get_socket_connection() + + try: + assert reader + except AssertionError: + return + + return await self._read_single_line(reader) + + async def write_single_line(self, message : str): + _, writer = await self.get_socket_connection() + + try: + assert writer + except AssertionError: + return + + await self._write_single_line(writer, message) + + async def _read_single_line(self, reader: asyncio.StreamReader) -> str: + line = bytearray() + while True: + try: + line.extend(await reader.readuntil()) + except asyncio.LimitOverrunError: + line.extend(await reader.read(reader._limit)) # type: ignore + continue + except asyncio.IncompleteReadError as err: + line.extend(err.partial) + break + else: + break + + return line.decode("utf-8") + + async def _write_single_line(self, writer: asyncio.StreamWriter, message : str): + if not message.endswith("\n"): + message += "\n" + + writer.write(message.encode("utf-8")) + await writer.drain() + + async def _listen_for_method_call(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): + while True: + line = await self._read_single_line(reader) + + try: + res = await self.on_new_message(line) + except Exception: + return + + if res != None: + await self._write_single_line(writer, res) + +class PortSocket (UnixSocket): + def __init__(self, on_new_message: Callable[[str], Awaitable[str|None]]): + ''' + on_new_message takes 1 string argument. + It's return value gets used, if not None, to write data to the socket. + Method should be async + ''' + super().__init__(on_new_message) + self.host = "127.0.0.1" + self.port = random.sample(range(40000, 60000), 1)[0] + + async def setup_server(self): + self.socket = await asyncio.start_server(self._listen_for_method_call, host=self.host, port=self.port, limit=BUFFER_LIMIT) + + async def _open_socket_if_not_exists(self): + if not self.reader: + retries = 0 + while retries < 10: + try: + self.reader, self.writer = await asyncio.open_connection(host=self.host, port=self.port, limit=BUFFER_LIMIT) + return True + except: + await asyncio.sleep(2) + retries += 1 + return False + else: + return True + +if ON_WINDOWS: + class LocalSocket (PortSocket): # type: ignore + pass +else: + class LocalSocket (UnixSocket): + pass \ No newline at end of file diff --git a/backend/src/main.py b/backend/src/main.py new file mode 100644 index 00000000..83a4b997 --- /dev/null +++ b/backend/src/main.py @@ -0,0 +1,192 @@ +# Change PyInstaller files permissions +import sys +from typing import Dict +from .localplatform import (chmod, chown, service_stop, service_start, + ON_WINDOWS, get_log_level, get_live_reload, + get_server_port, get_server_host, get_chown_plugin_path, + get_privileged_path) +if hasattr(sys, '_MEIPASS'): + chmod(sys._MEIPASS, 755) # type: ignore +# Full imports +from asyncio import AbstractEventLoop, new_event_loop, set_event_loop, sleep +from logging import basicConfig, getLogger +from os import path +from traceback import format_exc +import multiprocessing + +import aiohttp_cors # type: ignore +# Partial imports +from aiohttp import client_exceptions +from aiohttp.web import Application, Response, Request, get, run_app, static # type: ignore +from aiohttp_jinja2 import setup as jinja_setup + +# local modules +from .browser import PluginBrowser +from .helpers import (REMOTE_DEBUGGER_UNIT, csrf_middleware, get_csrf_token, + mkdir_as_user, get_system_pythonpaths, get_effective_user_id) + +from .injector import get_gamepadui_tab, Tab, close_old_tabs +from .loader import Loader +from .settings import SettingsManager +from .updater import Updater +from .utilities import Utilities +from .customtypes import UserType + + +basicConfig( + level=get_log_level(), + format="[%(module)s][%(levelname)s]: %(message)s" +) + +logger = getLogger("Main") +plugin_path = path.join(get_privileged_path(), "plugins") + +def chown_plugin_dir(): + if not path.exists(plugin_path): # For safety, create the folder before attempting to do anything with it + mkdir_as_user(plugin_path) + + if not chown(plugin_path, UserType.HOST_USER) or not chmod(plugin_path, 555): + logger.error(f"chown/chmod exited with a non-zero exit code") + +if get_chown_plugin_path() == True: + chown_plugin_dir() + +class PluginManager: + def __init__(self, loop: AbstractEventLoop) -> None: + self.loop = loop + self.web_app = Application() + self.web_app.middlewares.append(csrf_middleware) + self.cors = aiohttp_cors.setup(self.web_app, defaults={ + "https://steamloopback.host": aiohttp_cors.ResourceOptions( + expose_headers="*", + allow_headers="*", + allow_credentials=True + ) + }) + self.plugin_loader = Loader(self, plugin_path, self.loop, get_live_reload()) + self.settings = SettingsManager("loader", path.join(get_privileged_path(), "settings")) + self.plugin_browser = PluginBrowser(plugin_path, self.plugin_loader.plugins, self.plugin_loader, self.settings) + self.utilities = Utilities(self) + self.updater = Updater(self) + + jinja_setup(self.web_app) + + async def startup(_: Application): + if self.settings.getSetting("cef_forward", False): + self.loop.create_task(service_start(REMOTE_DEBUGGER_UNIT)) + else: + self.loop.create_task(service_stop(REMOTE_DEBUGGER_UNIT)) + self.loop.create_task(self.loader_reinjector()) + self.loop.create_task(self.load_plugins()) + + self.web_app.on_startup.append(startup) + + self.loop.set_exception_handler(self.exception_handler) + self.web_app.add_routes([get("/auth/token", self.get_auth_token)]) + + for route in list(self.web_app.router.routes()): + self.cors.add(route) # type: ignore + self.web_app.add_routes([static("/static", path.join(path.dirname(__file__), '..', 'static'))]) + self.web_app.add_routes([static("/legacy", path.join(path.dirname(__file__), 'legacy'))]) + + def exception_handler(self, loop: AbstractEventLoop, context: Dict[str, str]): + if context["message"] == "Unclosed connection": + return + loop.default_exception_handler(context) + + async def get_auth_token(self, request: Request): + return Response(text=get_csrf_token()) + + async def load_plugins(self): + # await self.wait_for_server() + logger.debug("Loading plugins") + self.plugin_loader.import_plugins() + # await inject_to_tab("SP", "window.syncDeckyPlugins();") + if self.settings.getSetting("pluginOrder", None) == None: + self.settings.setSetting("pluginOrder", list(self.plugin_loader.plugins.keys())) + logger.debug("Did not find pluginOrder setting, set it to default") + + async def loader_reinjector(self): + while True: + tab = None + nf = False + dc = False + while not tab: + try: + tab = await get_gamepadui_tab() + except (client_exceptions.ClientConnectorError, client_exceptions.ServerDisconnectedError): + if not dc: + logger.debug("Couldn't connect to debugger, waiting...") + dc = True + pass + except ValueError: + if not nf: + logger.debug("Couldn't find GamepadUI tab, waiting...") + nf = True + pass + if not tab: + await sleep(5) + await tab.open_websocket() + await tab.enable() + await self.inject_javascript(tab, True) + try: + async for msg in tab.listen_for_message(): + # this gets spammed a lot + if msg.get("method", None) != "Page.navigatedWithinDocument": + logger.debug("Page event: " + str(msg.get("method", None))) + if msg.get("method", None) == "Page.domContentEventFired": + if not await tab.has_global_var("deckyHasLoaded", False): + await self.inject_javascript(tab) + if msg.get("method", None) == "Inspector.detached": + logger.info("CEF has requested that we detach.") + await tab.close_websocket() + break + # If this is a forceful disconnect the loop will just stop without any failure message. In this case, injector.py will handle this for us so we don't need to close the socket. + # This is because of https://github.com/aio-libs/aiohttp/blob/3ee7091b40a1bc58a8d7846e7878a77640e96996/aiohttp/client_ws.py#L321 + logger.info("CEF has disconnected...") + # At this point the loop starts again and we connect to the freshly started Steam client once it is ready. + except Exception: + logger.error("Exception while reading page events " + format_exc()) + await tab.close_websocket() + pass + # while True: + # await sleep(5) + # if not await tab.has_global_var("deckyHasLoaded", False): + # logger.info("Plugin loader isn't present in Steam anymore, reinjecting...") + # await self.inject_javascript(tab) + + async def inject_javascript(self, tab: Tab, first: bool=False, request: Request|None=None): + logger.info("Loading Decky frontend!") + try: + if first: + if await tab.has_global_var("deckyHasLoaded", False): + await close_old_tabs() + await tab.evaluate_js("try{if (window.deckyHasLoaded){setTimeout(() => location.reload(), 100)}else{window.deckyHasLoaded = true;(async()=>{try{while(!window.SP_REACT){await new Promise(r => setTimeout(r, 10))};await import('http://localhost:1337/frontend/index.js')}catch(e){console.error(e)};})();}}catch(e){console.error(e)}", False, False, False) + except: + logger.info("Failed to inject JavaScript into tab\n" + format_exc()) + pass + + def run(self): + return run_app(self.web_app, host=get_server_host(), port=get_server_port(), loop=self.loop, access_log=None) + +def main(): + if ON_WINDOWS: + # Fix windows/flask not recognising that .js means 'application/javascript' + import mimetypes + mimetypes.add_type('application/javascript', '.js') + + # Required for multiprocessing support in frozen files + multiprocessing.freeze_support() + else: + if get_effective_user_id() != 0: + logger.warning(f"decky is running as an unprivileged user, this is not officially supported and may cause issues") + + # Append the loader's plugin path to the recognized python paths + sys.path.append(path.join(path.dirname(__file__), "plugin")) + + # Append the system and user python paths + sys.path.extend(get_system_pythonpaths()) + + loop = new_event_loop() + set_event_loop(loop) + PluginManager(loop).run() diff --git a/backend/src/plugin.py b/backend/src/plugin.py new file mode 100644 index 00000000..163bb9b6 --- /dev/null +++ b/backend/src/plugin.py @@ -0,0 +1,163 @@ +import multiprocessing +from asyncio import (Lock, get_event_loop, new_event_loop, + set_event_loop, sleep) +from importlib.util import module_from_spec, spec_from_file_location +from json import dumps, load, loads +from logging import getLogger +from traceback import format_exc +from os import path, environ +from signal import SIGINT, signal +from sys import exit, path as syspath +from typing import Any, Dict +from .localsocket import LocalSocket +from .localplatform import setgid, setuid, get_username, get_home_path +from .customtypes import UserType +from . import helpers + +class PluginWrapper: + def __init__(self, file: str, plugin_directory: str, plugin_path: str) -> None: + self.file = file + self.plugin_path = plugin_path + self.plugin_directory = plugin_directory + self.method_call_lock = Lock() + self.socket: LocalSocket = LocalSocket(self._on_new_message) + + self.version = None + + json = load(open(path.join(plugin_path, plugin_directory, "plugin.json"), "r", encoding="utf-8")) + if path.isfile(path.join(plugin_path, plugin_directory, "package.json")): + package_json = load(open(path.join(plugin_path, plugin_directory, "package.json"), "r", encoding="utf-8")) + self.version = package_json["version"] + + self.legacy = False + self.main_view_html = json["main_view_html"] if "main_view_html" in json else "" + self.tile_view_html = json["tile_view_html"] if "tile_view_html" in json else "" + self.legacy = self.main_view_html or self.tile_view_html + + self.name = json["name"] + self.author = json["author"] + self.flags = json["flags"] + + self.log = getLogger("plugin") + + self.passive = not path.isfile(self.file) + + def __str__(self) -> str: + return self.name + + def _init(self): + try: + signal(SIGINT, lambda s, f: exit(0)) + + set_event_loop(new_event_loop()) + if self.passive: + return + setgid(UserType.ROOT if "root" in self.flags else UserType.HOST_USER) + setuid(UserType.ROOT if "root" in self.flags else UserType.HOST_USER) + # export a bunch of environment variables to help plugin developers + environ["HOME"] = get_home_path(UserType.ROOT if "root" in self.flags else UserType.HOST_USER) + environ["USER"] = "root" if "root" in self.flags else get_username() + environ["DECKY_VERSION"] = helpers.get_loader_version() + environ["DECKY_USER"] = get_username() + environ["DECKY_USER_HOME"] = helpers.get_home_path() + environ["DECKY_HOME"] = helpers.get_homebrew_path() + environ["DECKY_PLUGIN_SETTINGS_DIR"] = path.join(environ["DECKY_HOME"], "settings", self.plugin_directory) + helpers.mkdir_as_user(path.join(environ["DECKY_HOME"], "settings")) + helpers.mkdir_as_user(environ["DECKY_PLUGIN_SETTINGS_DIR"]) + environ["DECKY_PLUGIN_RUNTIME_DIR"] = path.join(environ["DECKY_HOME"], "data", self.plugin_directory) + helpers.mkdir_as_user(path.join(environ["DECKY_HOME"], "data")) + helpers.mkdir_as_user(environ["DECKY_PLUGIN_RUNTIME_DIR"]) + environ["DECKY_PLUGIN_LOG_DIR"] = path.join(environ["DECKY_HOME"], "logs", self.plugin_directory) + helpers.mkdir_as_user(path.join(environ["DECKY_HOME"], "logs")) + helpers.mkdir_as_user(environ["DECKY_PLUGIN_LOG_DIR"]) + environ["DECKY_PLUGIN_DIR"] = path.join(self.plugin_path, self.plugin_directory) + environ["DECKY_PLUGIN_NAME"] = self.name + if self.version: + environ["DECKY_PLUGIN_VERSION"] = self.version + environ["DECKY_PLUGIN_AUTHOR"] = self.author + + # append the plugin's `py_modules` to the recognized python paths + syspath.append(path.join(environ["DECKY_PLUGIN_DIR"], "py_modules")) + + spec = spec_from_file_location("_", self.file) + assert spec is not None + module = module_from_spec(spec) + assert spec.loader is not None + spec.loader.exec_module(module) + self.Plugin = module.Plugin + + if hasattr(self.Plugin, "_migration"): + get_event_loop().run_until_complete(self.Plugin._migration(self.Plugin)) + if hasattr(self.Plugin, "_main"): + get_event_loop().create_task(self.Plugin._main(self.Plugin)) + get_event_loop().create_task(self.socket.setup_server()) + get_event_loop().run_forever() + except: + self.log.error("Failed to start " + self.name + "!\n" + format_exc()) + exit(0) + + async def _unload(self): + try: + self.log.info("Attempting to unload with plugin " + self.name + "'s \"_unload\" function.\n") + if hasattr(self.Plugin, "_unload"): + await self.Plugin._unload(self.Plugin) + self.log.info("Unloaded " + self.name + "\n") + else: + self.log.info("Could not find \"_unload\" in " + self.name + "'s main.py" + "\n") + except: + self.log.error("Failed to unload " + self.name + "!\n" + format_exc()) + exit(0) + + async def _on_new_message(self, message : str) -> str|None: + data = loads(message) + + if "stop" in data: + self.log.info("Calling Loader unload function.") + await self._unload() + get_event_loop().stop() + while get_event_loop().is_running(): + await sleep(0) + get_event_loop().close() + raise Exception("Closing message listener") + + # TODO there is definitely a better way to type this + d: Dict[str, Any] = {"res": None, "success": True} + try: + d["res"] = await getattr(self.Plugin, data["method"])(self.Plugin, **data["args"]) + except Exception as e: + d["res"] = str(e) + d["success"] = False + finally: + return dumps(d, ensure_ascii=False) + + def start(self): + if self.passive: + return self + multiprocessing.Process(target=self._init).start() + return self + + def stop(self): + if self.passive: + return + + async def _(self: PluginWrapper): + await self.socket.write_single_line(dumps({ "stop": True }, ensure_ascii=False)) + await self.socket.close_socket_connection() + + get_event_loop().create_task(_(self)) + + async def execute_method(self, method_name: str, kwargs: Dict[Any, Any]): + if self.passive: + raise RuntimeError("This plugin is passive (aka does not implement main.py)") + async with self.method_call_lock: + # reader, writer = + await self.socket.get_socket_connection() + + await self.socket.write_single_line(dumps({ "method": method_name, "args": kwargs }, ensure_ascii=False)) + + line = await self.socket.read_single_line() + if line != None: + res = loads(line) + if not res["success"]: + raise Exception(res["res"]) + return res["res"] \ No newline at end of file diff --git a/backend/src/settings.py b/backend/src/settings.py new file mode 100644 index 00000000..a9ab3daa --- /dev/null +++ b/backend/src/settings.py @@ -0,0 +1,60 @@ +from json import dump, load +from os import mkdir, path, listdir, rename +from typing import Any, Dict +from .localplatform import chown, folder_owner, get_chown_plugin_path +from .customtypes import UserType + +from .helpers import get_homebrew_path + + +class SettingsManager: + def __init__(self, name: str, settings_directory: str | None = None) -> None: + wrong_dir = get_homebrew_path() + if settings_directory == None: + settings_directory = path.join(wrong_dir, "settings") + + self.path = path.join(settings_directory, name + ".json") + + #Create the folder with the correct permission + if not path.exists(settings_directory): + mkdir(settings_directory) + + #Copy all old settings file in the root directory to the correct folder + for file in listdir(wrong_dir): + if file.endswith(".json"): + rename(path.join(wrong_dir,file), + path.join(settings_directory, file)) + self.path = path.join(settings_directory, name + ".json") + + + #If the owner of the settings directory is not the user, then set it as the user: + expected_user = UserType.HOST_USER if get_chown_plugin_path() else UserType.ROOT + if folder_owner(settings_directory) != expected_user: + chown(settings_directory, expected_user, False) + + self.settings: Dict[str, Any] = {} + + try: + open(self.path, "x", encoding="utf-8") + except FileExistsError as _: + self.read() + pass + + def read(self): + try: + with open(self.path, "r", encoding="utf-8") as file: + self.settings = load(file) + except Exception as e: + print(e) + pass + + def commit(self): + with open(self.path, "w+", encoding="utf-8") as file: + dump(self.settings, file, indent=4, ensure_ascii=False) + + def getSetting(self, key: str, default: Any = None) -> Any: + return self.settings.get(key, default) + + def setSetting(self, key: str, value: Any) -> Any: + self.settings[key] = value + self.commit() diff --git a/backend/src/updater.py b/backend/src/updater.py new file mode 100644 index 00000000..d28e67b0 --- /dev/null +++ b/backend/src/updater.py @@ -0,0 +1,238 @@ +from __future__ import annotations +import os +import shutil +from asyncio import sleep +from json.decoder import JSONDecodeError +from logging import getLogger +from os import getcwd, path, remove +from typing import TYPE_CHECKING, List, TypedDict +if TYPE_CHECKING: + from .main import PluginManager +from .localplatform import chmod, service_restart, ON_LINUX, get_keep_systemd_service, get_selinux + +from aiohttp import ClientSession, web + +from . import helpers +from .injector import get_gamepadui_tab +from .settings import SettingsManager + +logger = getLogger("Updater") + +class RemoteVerAsset(TypedDict): + name: str + browser_download_url: str +class RemoteVer(TypedDict): + tag_name: str + prerelease: bool + assets: List[RemoteVerAsset] + +class Updater: + def __init__(self, context: PluginManager) -> None: + self.context = context + self.settings = self.context.settings + # Exposes updater methods to frontend + self.updater_methods = { + "get_branch": self._get_branch, + "get_version": self.get_version, + "do_update": self.do_update, + "do_restart": self.do_restart, + "check_for_updates": self.check_for_updates + } + self.remoteVer: RemoteVer | None = None + self.allRemoteVers: List[RemoteVer] = [] + self.localVer = helpers.get_loader_version() + + try: + self.currentBranch = self.get_branch(self.context.settings) + except: + self.currentBranch = 0 + logger.error("Current branch could not be determined, defaulting to \"Stable\"") + + if context: + context.web_app.add_routes([ + web.post("/updater/{method_name}", self._handle_server_method_call) + ]) + context.loop.create_task(self.version_reloader()) + + async def _handle_server_method_call(self, request: web.Request): + method_name = request.match_info["method_name"] + try: + args = await request.json() + except JSONDecodeError: + args = {} + res = {} + try: + r = await self.updater_methods[method_name](**args) # type: ignore + res["result"] = r + res["success"] = True + except Exception as e: + res["result"] = str(e) + res["success"] = False + return web.json_response(res) + + def get_branch(self, manager: SettingsManager): + ver = manager.getSetting("branch", -1) + logger.debug("current branch: %i" % ver) + if ver == -1: + logger.info("Current branch is not set, determining branch from version...") + if self.localVer.startswith("v") and "-pre" in self.localVer: + logger.info("Current version determined to be pre-release") + manager.setSetting('branch', 1) + return 1 + else: + logger.info("Current version determined to be stable") + manager.setSetting('branch', 0) + return 0 + return ver + + async def _get_branch(self, manager: SettingsManager): + return self.get_branch(manager) + + # retrieve relevant service file's url for each branch + def get_service_url(self): + logger.debug("Getting service URL") + branch = self.get_branch(self.context.settings) + match branch: + case 0: + url = "https://raw.githubusercontent.com/SteamDeckHomebrew/decky-loader/main/dist/plugin_loader-release.service" + case 1 | 2: + url = "https://raw.githubusercontent.com/SteamDeckHomebrew/decky-loader/main/dist/plugin_loader-prerelease.service" + case _: + logger.error("You have an invalid branch set... Defaulting to prerelease service, please send the logs to the devs!") + url = "https://raw.githubusercontent.com/SteamDeckHomebrew/decky-loader/main/dist/plugin_loader-prerelease.service" + return str(url) + + async def get_version(self): + return { + "current": self.localVer, + "remote": self.remoteVer, + "all": self.allRemoteVers, + "updatable": self.localVer != "unknown" + } + + async def check_for_updates(self): + logger.debug("checking for updates") + selectedBranch = self.get_branch(self.context.settings) + async with ClientSession() as web: + async with web.request("GET", "https://api.github.com/repos/SteamDeckHomebrew/decky-loader/releases", ssl=helpers.get_ssl_context()) as res: + remoteVersions: List[RemoteVer] = await res.json() + if selectedBranch == 0: + logger.debug("release type: release") + remoteVersions = list(filter(lambda ver: ver["tag_name"].startswith("v") and not ver["prerelease"] and not ver["tag_name"].find("-pre") > 0 and ver["tag_name"], remoteVersions)) + elif selectedBranch == 1: + logger.debug("release type: pre-release") + remoteVersions = list(filter(lambda ver:ver["tag_name"].startswith("v"), remoteVersions)) + else: + logger.error("release type: NOT FOUND") + raise ValueError("no valid branch found") + self.allRemoteVers = remoteVersions + logger.debug("determining release type to find, branch is %i" % selectedBranch) + if selectedBranch == 0: + logger.debug("release type: release") + self.remoteVer = next(filter(lambda ver: ver["tag_name"].startswith("v") and not ver["prerelease"] and not ver["tag_name"].find("-pre") > 0 and ver["tag_name"], remoteVersions), None) + elif selectedBranch == 1: + logger.debug("release type: pre-release") + self.remoteVer = next(filter(lambda ver:ver["tag_name"].startswith("v"), remoteVersions), None) + else: + logger.error("release type: NOT FOUND") + raise ValueError("no valid branch found") + logger.info("Updated remote version information") + tab = await get_gamepadui_tab() + await tab.evaluate_js(f"window.DeckyPluginLoader.notifyUpdates()", False, True, False) + return await self.get_version() + + async def version_reloader(self): + await sleep(30) + while True: + try: + await self.check_for_updates() + except: + pass + await sleep(60 * 60 * 6) # 6 hours + + async def do_update(self): + logger.debug("Starting update.") + try: + assert self.remoteVer + except AssertionError: + logger.error("Unable to update as remoteVer is missing") + return + + version = self.remoteVer["tag_name"] + download_url = None + download_filename = "PluginLoader" if ON_LINUX else "PluginLoader.exe" + download_temp_filename = download_filename + ".new" + + for x in self.remoteVer["assets"]: + if x["name"] == download_filename: + download_url = x["browser_download_url"] + break + + if download_url == None: + raise Exception("Download url not found") + + service_url = self.get_service_url() + logger.debug("Retrieved service URL") + + tab = await get_gamepadui_tab() + await tab.open_websocket() + async with ClientSession() as web: + if ON_LINUX and not get_keep_systemd_service(): + logger.debug("Downloading systemd service") + # download the relevant systemd service depending upon branch + async with web.request("GET", service_url, ssl=helpers.get_ssl_context(), allow_redirects=True) as res: + logger.debug("Downloading service file") + data = await res.content.read() + logger.debug(str(data)) + service_file_path = path.join(getcwd(), "plugin_loader.service") + try: + with open(path.join(getcwd(), "plugin_loader.service"), "wb") as out: + out.write(data) + except Exception as e: + logger.error(f"Error at %s", exc_info=e) + with open(path.join(getcwd(), "plugin_loader.service"), "r", encoding="utf-8") as service_file: + service_data = service_file.read() + service_data = service_data.replace("${HOMEBREW_FOLDER}", helpers.get_homebrew_path()) + with open(path.join(getcwd(), "plugin_loader.service"), "w", encoding="utf-8") as service_file: + service_file.write(service_data) + + logger.debug("Saved service file") + logger.debug("Copying service file over current file.") + shutil.copy(service_file_path, "/etc/systemd/system/plugin_loader.service") + if not os.path.exists(path.join(getcwd(), ".systemd")): + os.mkdir(path.join(getcwd(), ".systemd")) + shutil.move(service_file_path, path.join(getcwd(), ".systemd")+"/plugin_loader.service") + + logger.debug("Downloading binary") + async with web.request("GET", download_url, ssl=helpers.get_ssl_context(), allow_redirects=True) as res: + total = int(res.headers.get('content-length', 0)) + with open(path.join(getcwd(), download_temp_filename), "wb") as out: + progress = 0 + raw = 0 + async for c in res.content.iter_chunked(512): + out.write(c) + raw += len(c) + new_progress = round((raw / total) * 100) + if progress != new_progress: + self.context.loop.create_task(tab.evaluate_js(f"window.DeckyUpdater.updateProgress({new_progress})", False, False, False)) + progress = new_progress + + with open(path.join(getcwd(), ".loader.version"), "w", encoding="utf-8") as out: + out.write(version) + + if ON_LINUX: + remove(path.join(getcwd(), download_filename)) + shutil.move(path.join(getcwd(), download_temp_filename), path.join(getcwd(), download_filename)) + chmod(path.join(getcwd(), download_filename), 777, False) + if get_selinux(): + from asyncio.subprocess import create_subprocess_exec + process = await create_subprocess_exec("chcon", "-t", "bin_t", path.join(getcwd(), download_filename)) + logger.info(f"Setting the executable flag with chcon returned {await process.wait()}") + + logger.info("Updated loader installation.") + await tab.evaluate_js("window.DeckyUpdater.finish()", False, False) + await self.do_restart() + await tab.close_websocket() + + async def do_restart(self): + await service_restart("plugin_loader") diff --git a/backend/src/utilities.py b/backend/src/utilities.py new file mode 100644 index 00000000..b0e23b88 --- /dev/null +++ b/backend/src/utilities.py @@ -0,0 +1,373 @@ +from __future__ import annotations +from os import stat_result +import uuid +from json.decoder import JSONDecodeError +from os.path import splitext +import re +from traceback import format_exc +from stat import FILE_ATTRIBUTE_HIDDEN # type: ignore + +from asyncio import StreamReader, StreamWriter, start_server, gather, open_connection +from aiohttp import ClientSession, web +from typing import TYPE_CHECKING, Callable, Coroutine, Dict, Any, List, TypedDict + +from logging import getLogger +from pathlib import Path + +from .browser import PluginInstallRequest, PluginInstallType +if TYPE_CHECKING: + from .main import PluginManager +from .injector import inject_to_tab, get_gamepadui_tab, close_old_tabs, get_tab +from .localplatform import ON_WINDOWS +from . import helpers +from .localplatform import service_stop, service_start, get_home_path, get_username + +class FilePickerObj(TypedDict): + file: Path + filest: stat_result + is_dir: bool + +class Utilities: + def __init__(self, context: PluginManager) -> None: + self.context = context + self.util_methods: Dict[str, Callable[..., Coroutine[Any, Any, Any]]] = { + "ping": self.ping, + "http_request": self.http_request, + "install_plugin": self.install_plugin, + "install_plugins": self.install_plugins, + "cancel_plugin_install": self.cancel_plugin_install, + "confirm_plugin_install": self.confirm_plugin_install, + "uninstall_plugin": self.uninstall_plugin, + "execute_in_tab": self.execute_in_tab, + "inject_css_into_tab": self.inject_css_into_tab, + "remove_css_from_tab": self.remove_css_from_tab, + "allow_remote_debugging": self.allow_remote_debugging, + "disallow_remote_debugging": self.disallow_remote_debugging, + "set_setting": self.set_setting, + "get_setting": self.get_setting, + "filepicker_ls": self.filepicker_ls, + "disable_rdt": self.disable_rdt, + "enable_rdt": self.enable_rdt, + "get_tab_id": self.get_tab_id, + "get_user_info": self.get_user_info, + } + + self.logger = getLogger("Utilities") + + self.rdt_proxy_server = None + self.rdt_script_id = None + self.rdt_proxy_task = None + + if context: + context.web_app.add_routes([ + web.post("/methods/{method_name}", self._handle_server_method_call) + ]) + + async def _handle_server_method_call(self, request: web.Request): + method_name = request.match_info["method_name"] + try: + args = await request.json() + except JSONDecodeError: + args = {} + res = {} + try: + r = await self.util_methods[method_name](**args) + res["result"] = r + res["success"] = True + except Exception as e: + res["result"] = str(e) + res["success"] = False + return web.json_response(res) + + async def install_plugin(self, artifact: str="", name: str="No name", version: str="dev", hash: str="", install_type: PluginInstallType=PluginInstallType.INSTALL): + return await self.context.plugin_browser.request_plugin_install( + artifact=artifact, + name=name, + version=version, + hash=hash, + install_type=install_type + ) + + async def install_plugins(self, requests: List[PluginInstallRequest]): + return await self.context.plugin_browser.request_multiple_plugin_installs( + requests=requests + ) + + async def confirm_plugin_install(self, request_id: str): + return await self.context.plugin_browser.confirm_plugin_install(request_id) + + async def cancel_plugin_install(self, request_id: str): + return self.context.plugin_browser.cancel_plugin_install(request_id) + + async def uninstall_plugin(self, name: str): + return await self.context.plugin_browser.uninstall_plugin(name) + + async def http_request(self, method: str="", url: str="", **kwargs: Any): + async with ClientSession() as web: + res = await web.request(method, url, ssl=helpers.get_ssl_context(), **kwargs) + text = await res.text() + return { + "status": res.status, + "headers": dict(res.headers), + "body": text + } + + async def ping(self, **kwargs: Any): + return "pong" + + async def execute_in_tab(self, tab: str, run_async: bool, code: str): + try: + result = await inject_to_tab(tab, code, run_async) + assert result + if "exceptionDetails" in result["result"]: + return { + "success": False, + "result": result["result"] + } + + return { + "success": True, + "result": result["result"]["result"].get("value") + } + except Exception as e: + return { + "success": False, + "result": e + } + + async def inject_css_into_tab(self, tab: str, style: str): + try: + css_id = str(uuid.uuid4()) + + result = await inject_to_tab(tab, + f""" + (function() {{ + const style = document.createElement('style'); + style.id = "{css_id}"; + document.head.append(style); + style.textContent = `{style}`; + }})() + """, False) + + if result and "exceptionDetails" in result["result"]: + return { + "success": False, + "result": result["result"] + } + + return { + "success": True, + "result": css_id + } + except Exception as e: + return { + "success": False, + "result": e + } + + async def remove_css_from_tab(self, tab: str, css_id: str): + try: + result = await inject_to_tab(tab, + f""" + (function() {{ + let style = document.getElementById("{css_id}"); + + if (style.nodeName.toLowerCase() == 'style') + style.parentNode.removeChild(style); + }})() + """, False) + + if result and "exceptionDetails" in result["result"]: + return { + "success": False, + "result": result + } + + return { + "success": True + } + except Exception as e: + return { + "success": False, + "result": e + } + + async def get_setting(self, key: str, default: Any): + return self.context.settings.getSetting(key, default) + + async def set_setting(self, key: str, value: Any): + return self.context.settings.setSetting(key, value) + + async def allow_remote_debugging(self): + await service_start(helpers.REMOTE_DEBUGGER_UNIT) + return True + + async def disallow_remote_debugging(self): + await service_stop(helpers.REMOTE_DEBUGGER_UNIT) + return True + + async def filepicker_ls(self, + path : str | None = None, + include_files: bool = True, + include_folders: bool = True, + include_ext: list[str] = [], + include_hidden: bool = False, + order_by: str = "name_asc", + filter_for: str | None = None, + page: int = 1, + max: int = 1000): + + if path == None: + path = get_home_path() + + path_obj = Path(path).resolve() + + files: List[FilePickerObj] = [] + folders: List[FilePickerObj] = [] + + #Resolving all files/folders in the requested directory + for file in path_obj.iterdir(): + if file.exists(): + filest = file.stat() + is_hidden = file.name.startswith('.') + if ON_WINDOWS and not is_hidden: + is_hidden = bool(filest.st_file_attributes & FILE_ATTRIBUTE_HIDDEN) # type: ignore + if include_folders and file.is_dir(): + if (is_hidden and include_hidden) or not is_hidden: + folders.append({"file": file, "filest": filest, "is_dir": True}) + elif include_files: + # Handle requested extensions if present + if len(include_ext) == 0 or 'all_files' in include_ext \ + or splitext(file.name)[1].lstrip('.') in include_ext: + if (is_hidden and include_hidden) or not is_hidden: + files.append({"file": file, "filest": filest, "is_dir": False}) + # Filter logic + if filter_for is not None: + try: + if re.compile(filter_for): + files = list(filter(lambda file: re.search(filter_for, file["file"].name) != None, files)) + except re.error: + files = list(filter(lambda file: file["file"].name.find(filter_for) != -1, files)) + + # Ordering logic + ord_arg = order_by.split("_") + ord = ord_arg[0] + rev = True if ord_arg[1] == "asc" else False + match ord: + case 'name': + files.sort(key=lambda x: x['file'].name.casefold(), reverse = rev) + folders.sort(key=lambda x: x['file'].name.casefold(), reverse = rev) + case 'modified': + files.sort(key=lambda x: x['filest'].st_mtime, reverse = not rev) + folders.sort(key=lambda x: x['filest'].st_mtime, reverse = not rev) + case 'created': + files.sort(key=lambda x: x['filest'].st_ctime, reverse = not rev) + folders.sort(key=lambda x: x['filest'].st_ctime, reverse = not rev) + case 'size': + files.sort(key=lambda x: x['filest'].st_size, reverse = not rev) + # Folders has no file size, order by name instead + folders.sort(key=lambda x: x['file'].name.casefold()) + case _: + files.sort(key=lambda x: x['file'].name.casefold(), reverse = rev) + folders.sort(key=lambda x: x['file'].name.casefold(), reverse = rev) + + #Constructing the final file list, folders first + all = [{ + "isdir": x['is_dir'], + "name": str(x['file'].name), + "realpath": str(x['file']), + "size": x['filest'].st_size, + "modified": x['filest'].st_mtime, + "created": x['filest'].st_ctime, + } for x in folders + files ] + + return { + "realpath": str(path), + "files": all[(page-1)*max:(page)*max], + "total": len(all), + } + + + # Based on https://stackoverflow.com/a/46422554/13174603 + def start_rdt_proxy(self, ip: str, port: int): + async def pipe(reader: StreamReader, writer: StreamWriter): + try: + while not reader.at_eof(): + writer.write(await reader.read(2048)) + finally: + writer.close() + async def handle_client(local_reader: StreamReader, local_writer: StreamWriter): + try: + remote_reader, remote_writer = await open_connection( + ip, port) + pipe1 = pipe(local_reader, remote_writer) + pipe2 = pipe(remote_reader, local_writer) + await gather(pipe1, pipe2) + finally: + local_writer.close() + + self.rdt_proxy_server = start_server(handle_client, "127.0.0.1", port) + self.rdt_proxy_task = self.context.loop.create_task(self.rdt_proxy_server) + + def stop_rdt_proxy(self): + if self.rdt_proxy_server != None: + self.rdt_proxy_server.close() + if self.rdt_proxy_task: + self.rdt_proxy_task.cancel() + + async def _enable_rdt(self): + # TODO un-hardcode port + try: + self.stop_rdt_proxy() + ip = self.context.settings.getSetting("developer.rdt.ip", None) + + if ip != None: + self.logger.info("Connecting to React DevTools at " + ip) + async with ClientSession() as web: + res = await web.request("GET", "http://" + ip + ":8097", ssl=helpers.get_ssl_context()) + script = """ + if (!window.deckyHasConnectedRDT) { + window.deckyHasConnectedRDT = true; + // This fixes the overlay when hovering over an element in RDT + Object.defineProperty(window, '__REACT_DEVTOOLS_TARGET_WINDOW__', { + enumerable: true, + configurable: true, + get: function() { + return (GamepadNavTree?.m_context?.m_controller || FocusNavController)?.m_ActiveContext?.ActiveWindow || window; + } + }); + """ + await res.text() + "\n}" + if res.status != 200: + self.logger.error("Failed to connect to React DevTools at " + ip) + return False + self.start_rdt_proxy(ip, 8097) + self.logger.info("Connected to React DevTools, loading script") + tab = await get_gamepadui_tab() + # RDT needs to load before React itself to work. + await close_old_tabs() + result = await tab.reload_and_evaluate(script) + self.logger.info(result) + + except Exception: + self.logger.error("Failed to connect to React DevTools") + self.logger.error(format_exc()) + + async def enable_rdt(self): + self.context.loop.create_task(self._enable_rdt()) + + async def disable_rdt(self): + self.logger.info("Disabling React DevTools") + tab = await get_gamepadui_tab() + self.rdt_script_id = None + await close_old_tabs() + await tab.evaluate_js("location.reload();", False, True, False) + self.logger.info("React DevTools disabled") + + async def get_user_info(self) -> Dict[str, str]: + return { + "username": get_username(), + "path": get_home_path() + } + + async def get_tab_id(self, name: str): + return (await get_tab(name)).id -- cgit v1.2.3