summaryrefslogtreecommitdiff
path: root/backend/decky_loader
diff options
context:
space:
mode:
authorK900 <me@0upti.me>2023-11-14 00:40:37 +0300
committerGitHub <noreply@github.com>2023-11-13 23:40:37 +0200
commit5a633fdd8284dd1a2b6f3c95806f033ef4a4becf (patch)
treeb89f3660d3b8918484e6bc153003a84b95207045 /backend/decky_loader
parent8ce4a7679e9f0abb67e85822fefe08237ec9d82e (diff)
downloaddecky-loader-5a633fdd8284dd1a2b6f3c95806f033ef4a4becf.tar.gz
decky-loader-5a633fdd8284dd1a2b6f3c95806f033ef4a4becf.zip
* fix: get rid of title view jank on latest beta * Count the number of installs for each plugin (#557) * Bump aiohttp from 3.8.4 to 3.8.5 in /backend (#558) * fix: include Decky version in request for index.js This avoids the If-Modified-Since logic in aiohttp and ensures Steam doesn't cache old JS, even if the timestamps are normalized. * fix: clean up shellcheck warnings in act runner script * fix: gitignore settings/ * fix: ensure state directories exist when running without the installer * feat: determine root directory correctly when running from in-tree * fix: fix typo in CI script * refactor: build a proper Python package with poetry * refactor: move decky_plugin under the poetry structure There's no need to special case it anymore, just treat it like any other Python module. * sandboxed_plugin: better fix, attempt 2 --------- Co-authored-by: AAGaming <aagaming@riseup.net> Co-authored-by: Party Wumpus <48649272+PartyWumpus@users.noreply.github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Diffstat (limited to 'backend/decky_loader')
-rw-r--r--backend/decky_loader/browser.py287
-rw-r--r--backend/decky_loader/customtypes.py6
-rw-r--r--backend/decky_loader/helpers.py153
-rw-r--r--backend/decky_loader/injector.py438
-rw-r--r--backend/decky_loader/loader.py200
-rw-r--r--backend/decky_loader/localplatform/localplatform.py52
-rw-r--r--backend/decky_loader/localplatform/localplatformlinux.py201
-rw-r--r--backend/decky_loader/localplatform/localplatformwin.py55
-rw-r--r--backend/decky_loader/localplatform/localsocket.py145
-rw-r--r--backend/decky_loader/main.py188
-rw-r--r--backend/decky_loader/plugin/method_call_request.py29
-rw-r--r--backend/decky_loader/plugin/plugin.py84
-rw-r--r--backend/decky_loader/plugin/sandboxed_plugin.py138
-rw-r--r--backend/decky_loader/settings.py60
-rw-r--r--backend/decky_loader/updater.py238
-rw-r--r--backend/decky_loader/utilities.py373
16 files changed, 2647 insertions, 0 deletions
diff --git a/backend/decky_loader/browser.py b/backend/decky_loader/browser.py
new file mode 100644
index 00000000..436e8abf
--- /dev/null
+++ b/backend/decky_loader/browser.py
@@ -0,0 +1,287 @@
+# Full imports
+import json
+# import pprint
+# from pprint import pformat
+
+# Partial imports
+from aiohttp import ClientSession
+from asyncio import sleep
+from hashlib import sha256
+from io import BytesIO
+from logging import getLogger
+from os import R_OK, W_OK, path, listdir, access, mkdir
+from shutil import rmtree
+from time import time
+from zipfile import ZipFile
+from enum import IntEnum
+from typing import Dict, List, TypedDict
+
+# Local modules
+from .localplatform.localplatform import chown, chmod
+from .loader import Loader, Plugins
+from .helpers import get_ssl_context, download_remote_binary_to_path
+from .settings import SettingsManager
+from .injector import get_gamepadui_tab
+
+logger = getLogger("Browser")
+
+class PluginInstallType(IntEnum):
+ INSTALL = 0
+ REINSTALL = 1
+ UPDATE = 2
+
+class PluginInstallRequest(TypedDict):
+ name: str
+ artifact: str
+ version: str
+ hash: str
+ install_type: PluginInstallType
+
+class PluginInstallContext:
+ def __init__(self, artifact: str, name: str, version: str, hash: str) -> None:
+ self.artifact = artifact
+ self.name = name
+ self.version = version
+ self.hash = hash
+
+class PluginBrowser:
+ def __init__(self, plugin_path: str, plugins: Plugins, loader: Loader, settings: SettingsManager) -> None:
+ self.plugin_path = plugin_path
+ self.plugins = plugins
+ self.loader = loader
+ self.settings = settings
+ self.install_requests: Dict[str, PluginInstallContext | List[PluginInstallContext]] = {}
+
+ def _unzip_to_plugin_dir(self, zip: BytesIO, name: str, hash: str):
+ zip_hash = sha256(zip.getbuffer()).hexdigest()
+ if hash and (zip_hash != hash):
+ return False
+ zip_file = ZipFile(zip)
+ zip_file.extractall(self.plugin_path)
+ plugin_folder = self.find_plugin_folder(name)
+ assert plugin_folder is not None
+ plugin_dir = path.join(self.plugin_path, plugin_folder)
+
+ if not chown(plugin_dir) or not chmod(plugin_dir, 555):
+ logger.error(f"chown/chmod exited with a non-zero exit code")
+ return False
+ return True
+
+ async def _download_remote_binaries_for_plugin_with_name(self, pluginBasePath: str):
+ rv = False
+ try:
+ packageJsonPath = path.join(pluginBasePath, 'package.json')
+ pluginBinPath = path.join(pluginBasePath, 'bin')
+
+ if access(packageJsonPath, R_OK):
+ with open(packageJsonPath, "r", encoding="utf-8") as f:
+ packageJson = json.load(f)
+ if "remote_binary" in packageJson and len(packageJson["remote_binary"]) > 0:
+ # create bin directory if needed.
+ chmod(pluginBasePath, 777)
+ if access(pluginBasePath, W_OK):
+ if not path.exists(pluginBinPath):
+ mkdir(pluginBinPath)
+ if not access(pluginBinPath, W_OK):
+ chmod(pluginBinPath, 777)
+
+ rv = True
+ for remoteBinary in packageJson["remote_binary"]:
+ # Required Fields. If any Remote Binary is missing these fail the install.
+ binName = remoteBinary["name"]
+ binURL = remoteBinary["url"]
+ binHash = remoteBinary["sha256hash"]
+ if not await download_remote_binary_to_path(binURL, binHash, path.join(pluginBinPath, binName)):
+ rv = False
+ raise Exception(f"Error Downloading Remote Binary {binName}@{binURL} with hash {binHash} to {path.join(pluginBinPath, binName)}")
+
+ chown(self.plugin_path)
+ chmod(pluginBasePath, 555)
+ else:
+ rv = True
+ logger.debug(f"No Remote Binaries to Download")
+
+ except Exception as e:
+ rv = False
+ logger.debug(str(e))
+
+ return rv
+
+ """Return the filename (only) for the specified plugin"""
+ def find_plugin_folder(self, name: str) -> str | None:
+ for folder in listdir(self.plugin_path):
+ try:
+ with open(path.join(self.plugin_path, folder, 'plugin.json'), "r", encoding="utf-8") as f:
+ plugin = json.load(f)
+
+ if plugin['name'] == name:
+ return folder
+ except:
+ logger.debug(f"skipping {folder}")
+
+ async def uninstall_plugin(self, name: str):
+ if self.loader.watcher:
+ self.loader.watcher.disabled = True
+ tab = await get_gamepadui_tab()
+ plugin_folder = self.find_plugin_folder(name)
+ assert plugin_folder is not None
+ plugin_dir = path.join(self.plugin_path, plugin_folder)
+ try:
+ logger.info("uninstalling " + name)
+ logger.info(" at dir " + plugin_dir)
+ logger.debug("calling frontend unload for %s" % str(name))
+ res = await tab.evaluate_js(f"DeckyPluginLoader.unloadPlugin('{name}')")
+ logger.debug("result of unload from UI: %s", res)
+ # plugins_snapshot = self.plugins.copy()
+ # snapshot_string = pformat(plugins_snapshot)
+ # logger.debug("current plugins: %s", snapshot_string)
+ if name in self.plugins:
+ logger.debug("Plugin %s was found", name)
+ self.plugins[name].stop()
+ logger.debug("Plugin %s was stopped", name)
+ del self.plugins[name]
+ logger.debug("Plugin %s was removed from the dictionary", name)
+ self.cleanup_plugin_settings(name)
+ logger.debug("removing files %s" % str(name))
+ rmtree(plugin_dir)
+ except FileNotFoundError:
+ logger.warning(f"Plugin {name} not installed, skipping uninstallation")
+ except Exception as e:
+ logger.error(f"Plugin {name} in {plugin_dir} was not uninstalled")
+ logger.error(f"Error at {str(e)}", exc_info=e)
+ if self.loader.watcher:
+ self.loader.watcher.disabled = False
+
+ async def _install(self, artifact: str, name: str, version: str, hash: str):
+ # Will be set later in code
+ res_zip = None
+
+ # Check if plugin is installed
+ isInstalled = False
+ # Preserve plugin order before removing plugin (uninstall alters the order and removes the plugin from the list)
+ current_plugin_order = self.settings.getSetting("pluginOrder")[:]
+ if self.loader.watcher:
+ self.loader.watcher.disabled = True
+ try:
+ pluginFolderPath = self.find_plugin_folder(name)
+ if pluginFolderPath:
+ isInstalled = True
+ except:
+ logger.error(f"Failed to determine if {name} is already installed, continuing anyway.")
+
+ # Check if the file is a local file or a URL
+ if artifact.startswith("file://"):
+ logger.info(f"Installing {name} from local ZIP file (Version: {version})")
+ res_zip = BytesIO(open(artifact[7:], "rb").read())
+ else:
+ logger.info(f"Installing {name} from URL (Version: {version})")
+ async with ClientSession() as client:
+ logger.debug(f"Fetching {artifact}")
+ res = await client.get(artifact, ssl=get_ssl_context())
+ if res.status == 200:
+ logger.debug("Got 200. Reading...")
+ data = await res.read()
+ logger.debug(f"Read {len(data)} bytes")
+ res_zip = BytesIO(data)
+ else:
+ logger.fatal(f"Could not fetch from URL. {await res.text()}")
+
+ storeUrl = ""
+ match self.settings.getSetting("store", 0):
+ case 0: storeUrl = "https://plugins.deckbrew.xyz/plugins" # default
+ case 1: storeUrl = "https://testing.deckbrew.xyz/plugins" # testing
+ case 2: storeUrl = self.settings.getSetting("store-url", "https://plugins.deckbrew.xyz/plugins") # custom
+ case _: storeUrl = "https://plugins.deckbrew.xyz/plugins"
+ logger.info(f"Incrementing installs for {name} from URL {storeUrl} (version {version})")
+ async with ClientSession() as client:
+ res = await client.post(storeUrl+f"/{name}/versions/{version}/increment?isUpdate={isInstalled}", ssl=get_ssl_context())
+ if res.status != 200:
+ logger.error(f"Server did not accept install count increment request. code: {res.status}")
+
+ # Check to make sure we got the file
+ if res_zip is None:
+ logger.fatal(f"Could not fetch {artifact}")
+ return
+
+ # If plugin is installed, uninstall it
+ if isInstalled:
+ try:
+ logger.debug("Uninstalling existing plugin...")
+ await self.uninstall_plugin(name)
+ except:
+ logger.error(f"Plugin {name} could not be uninstalled.")
+
+ # Install the plugin
+ logger.debug("Unzipping...")
+ ret = self._unzip_to_plugin_dir(res_zip, name, hash)
+ if ret:
+ plugin_folder = self.find_plugin_folder(name)
+ assert plugin_folder is not None
+ plugin_dir = path.join(self.plugin_path, plugin_folder)
+ ret = await self._download_remote_binaries_for_plugin_with_name(plugin_dir)
+ if ret:
+ logger.info(f"Installed {name} (Version: {version})")
+ if name in self.loader.plugins:
+ self.loader.plugins[name].stop()
+ self.loader.plugins.pop(name, None)
+ await sleep(1)
+ if not isInstalled:
+ current_plugin_order = self.settings.getSetting("pluginOrder")
+ current_plugin_order.append(name)
+ self.settings.setSetting("pluginOrder", current_plugin_order)
+ logger.debug("Plugin %s was added to the pluginOrder setting", name)
+ self.loader.import_plugin(path.join(plugin_dir, "main.py"), plugin_folder)
+ else:
+ logger.fatal(f"Failed Downloading Remote Binaries")
+ else:
+ logger.fatal(f"SHA-256 Mismatch!!!! {name} (Version: {version})")
+ if self.loader.watcher:
+ self.loader.watcher.disabled = False
+
+ async def request_plugin_install(self, artifact: str, name: str, version: str, hash: str, install_type: PluginInstallType):
+ request_id = str(time())
+ self.install_requests[request_id] = PluginInstallContext(artifact, name, version, hash)
+ tab = await get_gamepadui_tab()
+ await tab.open_websocket()
+ await tab.evaluate_js(f"DeckyPluginLoader.addPluginInstallPrompt('{name}', '{version}', '{request_id}', '{hash}', {install_type})")
+
+ async def request_multiple_plugin_installs(self, requests: List[PluginInstallRequest]):
+ request_id = str(time())
+ self.install_requests[request_id] = [PluginInstallContext(req['artifact'], req['name'], req['version'], req['hash']) for req in requests]
+ js_requests_parameter = ','.join([
+ f"{{ name: '{req['name']}', version: '{req['version']}', hash: '{req['hash']}', install_type: {req['install_type']}}}" for req in requests
+ ])
+
+ tab = await get_gamepadui_tab()
+ await tab.open_websocket()
+ await tab.evaluate_js(f"DeckyPluginLoader.addMultiplePluginsInstallPrompt('{request_id}', [{js_requests_parameter}])")
+
+ async def confirm_plugin_install(self, request_id: str):
+ requestOrRequests = self.install_requests.pop(request_id)
+ if isinstance(requestOrRequests, list):
+ [await self._install(req.artifact, req.name, req.version, req.hash) for req in requestOrRequests]
+ else:
+ await self._install(requestOrRequests.artifact, requestOrRequests.name, requestOrRequests.version, requestOrRequests.hash)
+
+ def cancel_plugin_install(self, request_id: str):
+ self.install_requests.pop(request_id)
+
+ def cleanup_plugin_settings(self, name: str):
+ """Removes any settings related to a plugin. Propably called when a plugin is uninstalled.
+
+ Args:
+ name (string): The name of the plugin
+ """
+ hidden_plugins = self.settings.getSetting("hiddenPlugins", [])
+ if name in hidden_plugins:
+ hidden_plugins.remove(name)
+ self.settings.setSetting("hiddenPlugins", hidden_plugins)
+
+
+ plugin_order = self.settings.getSetting("pluginOrder", [])
+
+ if name in plugin_order:
+ plugin_order.remove(name)
+ self.settings.setSetting("pluginOrder", plugin_order)
+
+ logger.debug("Removed any settings for plugin %s", name)
diff --git a/backend/decky_loader/customtypes.py b/backend/decky_loader/customtypes.py
new file mode 100644
index 00000000..84ebc235
--- /dev/null
+++ b/backend/decky_loader/customtypes.py
@@ -0,0 +1,6 @@
+from enum import Enum
+
+class UserType(Enum):
+ HOST_USER = 1
+ EFFECTIVE_USER = 2
+ ROOT = 3 \ No newline at end of file
diff --git a/backend/decky_loader/helpers.py b/backend/decky_loader/helpers.py
new file mode 100644
index 00000000..e3770c63
--- /dev/null
+++ b/backend/decky_loader/helpers.py
@@ -0,0 +1,153 @@
+import re
+import ssl
+import uuid
+import os
+import subprocess
+from hashlib import sha256
+from io import BytesIO
+
+import certifi
+from aiohttp.web import Request, Response, middleware
+from aiohttp.typedefs import Handler
+from aiohttp import ClientSession
+from .localplatform import localplatform
+from .customtypes import UserType
+from logging import getLogger
+
+REMOTE_DEBUGGER_UNIT = "steam-web-debug-portforward.service"
+
+# global vars
+csrf_token = str(uuid.uuid4())
+ssl_ctx = ssl.create_default_context(cafile=certifi.where())
+
+assets_regex = re.compile("^/plugins/.*/assets/.*")
+frontend_regex = re.compile("^/frontend/.*")
+logger = getLogger("Main")
+
+def get_ssl_context():
+ return ssl_ctx
+
+def get_csrf_token():
+ return csrf_token
+
+@middleware
+async def csrf_middleware(request: Request, handler: Handler):
+ if str(request.method) == "OPTIONS" or request.headers.get('Authentication') == csrf_token or str(request.rel_url) == "/auth/token" or str(request.rel_url).startswith("/plugins/load_main/") or str(request.rel_url).startswith("/static/") or str(request.rel_url).startswith("/steam_resource/") or str(request.rel_url).startswith("/frontend/") or assets_regex.match(str(request.rel_url)) or frontend_regex.match(str(request.rel_url)):
+ return await handler(request)
+ return Response(text='Forbidden', status=403)
+
+# Get the default homebrew path unless a home_path is specified. home_path argument is deprecated
+def get_homebrew_path() -> str:
+ return localplatform.get_unprivileged_path()
+
+# Recursively create path and chown as user
+def mkdir_as_user(path: str):
+ path = os.path.realpath(path)
+ os.makedirs(path, exist_ok=True)
+ localplatform.chown(path)
+
+# Fetches the version of loader
+def get_loader_version() -> str:
+ try:
+ with open(os.path.join(os.getcwd(), ".loader.version"), "r", encoding="utf-8") as version_file:
+ return version_file.readline().strip()
+ except Exception as e:
+ logger.warn(f"Failed to execute get_loader_version(): {str(e)}")
+ return "unknown"
+
+# returns the appropriate system python paths
+def get_system_pythonpaths() -> list[str]:
+ try:
+ # run as normal normal user if on linux to also include user python paths
+ proc = subprocess.run(["python3" if localplatform.ON_LINUX else "python", "-c", "import sys; print('\\n'.join(x for x in sys.path if x))"],
+ # TODO make this less insane
+ capture_output=True, user=localplatform.localplatform._get_user_id() if localplatform.ON_LINUX else None, env={} if localplatform.ON_LINUX else None) # type: ignore
+ return [x.strip() for x in proc.stdout.decode().strip().split("\n")]
+ except Exception as e:
+ logger.warn(f"Failed to execute get_system_pythonpaths(): {str(e)}")
+ return []
+
+# Download Remote Binaries to local Plugin
+async def download_remote_binary_to_path(url: str, binHash: str, path: str) -> bool:
+ rv = False
+ try:
+ if os.access(os.path.dirname(path), os.W_OK):
+ async with ClientSession() as client:
+ res = await client.get(url, ssl=get_ssl_context())
+ if res.status == 200:
+ data = BytesIO(await res.read())
+ remoteHash = sha256(data.getbuffer()).hexdigest()
+ if binHash == remoteHash:
+ data.seek(0)
+ with open(path, 'wb') as f:
+ f.write(data.getbuffer())
+ rv = True
+ else:
+ raise Exception(f"Fatal Error: Hash Mismatch for remote binary {path}@{url}")
+ else:
+ rv = False
+ except:
+ rv = False
+
+ return rv
+
+# Deprecated
+def set_user():
+ pass
+
+# Deprecated
+def set_user_group() -> str:
+ return get_user_group()
+
+#########
+# Below is legacy code, provided for backwards compatibility. This will break on windows
+#########
+
+# Get the user id hosting the plugin loader
+def get_user_id() -> int:
+ return localplatform.localplatform._get_user_id() # pyright: ignore [reportPrivateUsage]
+
+# Get the user hosting the plugin loader
+def get_user() -> str:
+ return localplatform.localplatform._get_user() # pyright: ignore [reportPrivateUsage]
+
+# Get the effective user id of the running process
+def get_effective_user_id() -> int:
+ return localplatform.localplatform._get_effective_user_id() # pyright: ignore [reportPrivateUsage]
+
+# Get the effective user of the running process
+def get_effective_user() -> str:
+ return localplatform.localplatform._get_effective_user() # pyright: ignore [reportPrivateUsage]
+
+# Get the effective user group id of the running process
+def get_effective_user_group_id() -> int:
+ return localplatform.localplatform._get_effective_user_group_id() # pyright: ignore [reportPrivateUsage]
+
+# Get the effective user group of the running process
+def get_effective_user_group() -> str:
+ return localplatform.localplatform._get_effective_user_group() # pyright: ignore [reportPrivateUsage]
+
+# Get the user owner of the given file path.
+def get_user_owner(file_path: str) -> str:
+ return localplatform.localplatform._get_user_owner(file_path) # pyright: ignore [reportPrivateUsage]
+
+# Get the user group of the given file path, or the user group hosting the plugin loader
+def get_user_group(file_path: str | None = None) -> str:
+ return localplatform.localplatform._get_user_group(file_path) # pyright: ignore [reportPrivateUsage]
+
+# Get the group id of the user hosting the plugin loader
+def get_user_group_id() -> int:
+ return localplatform.localplatform._get_user_group_id() # pyright: ignore [reportPrivateUsage]
+
+# Get the default home path unless a user is specified
+def get_home_path(username: str | None = None) -> str:
+ return localplatform.get_home_path(UserType.ROOT if username == "root" else UserType.HOST_USER)
+
+async def is_systemd_unit_active(unit_name: str) -> bool:
+ return await localplatform.service_active(unit_name)
+
+async def stop_systemd_unit(unit_name: str) -> bool:
+ return await localplatform.service_stop(unit_name)
+
+async def start_systemd_unit(unit_name: str) -> bool:
+ return await localplatform.service_start(unit_name)
diff --git a/backend/decky_loader/injector.py b/backend/decky_loader/injector.py
new file mode 100644
index 00000000..a217f689
--- /dev/null
+++ b/backend/decky_loader/injector.py
@@ -0,0 +1,438 @@
+# Injector code from https://github.com/SteamDeckHomebrew/steamdeck-ui-inject. More info on how it works there.
+
+from asyncio import sleep
+from logging import getLogger
+from typing import Any, Callable, List, TypedDict, Dict
+
+from aiohttp import ClientSession
+from aiohttp.client_exceptions import ClientConnectorError, ClientOSError
+from asyncio.exceptions import TimeoutError
+import uuid
+
+BASE_ADDRESS = "http://localhost:8080"
+
+logger = getLogger("Injector")
+
+class _TabResponse(TypedDict):
+ title: str
+ id: str
+ url: str
+ webSocketDebuggerUrl: str
+
+class Tab:
+ cmd_id = 0
+
+ def __init__(self, res: _TabResponse) -> None:
+ self.title: str = res["title"]
+ self.id: str = res["id"]
+ self.url: str = res["url"]
+ self.ws_url: str = res["webSocketDebuggerUrl"]
+
+ self.websocket = None
+ self.client = None
+
+ async def open_websocket(self):
+ self.client = ClientSession()
+ self.websocket = await self.client.ws_connect(self.ws_url) # type: ignore
+
+ async def close_websocket(self):
+ if self.websocket:
+ await self.websocket.close()
+ if self.client:
+ await self.client.close()
+
+ async def listen_for_message(self):
+ if self.websocket:
+ async for message in self.websocket:
+ data = message.json()
+ yield data
+ logger.warn(f"The Tab {self.title} socket has been disconnected while listening for messages.")
+ await self.close_websocket()
+
+ async def _send_devtools_cmd(self, dc: Dict[str, Any], receive: bool = True):
+ if self.websocket:
+ self.cmd_id += 1
+ dc["id"] = self.cmd_id
+ await self.websocket.send_json(dc)
+ if receive:
+ async for msg in self.listen_for_message():
+ if "id" in msg and msg["id"] == dc["id"]:
+ return msg
+ return None
+ raise RuntimeError("Websocket not opened")
+
+ async def evaluate_js(self, js: str, run_async: bool | None = False, manage_socket: bool | None = True, get_result: bool = True):
+ try:
+ if manage_socket:
+ await self.open_websocket()
+
+ res = await self._send_devtools_cmd({
+ "method": "Runtime.evaluate",
+ "params": {
+ "expression": js,
+ "userGesture": True,
+ "awaitPromise": run_async
+ }
+ }, get_result)
+
+ finally:
+ if manage_socket:
+ await self.close_websocket()
+ return res
+
+ async def has_global_var(self, var_name: str, manage_socket: bool = True):
+ res = await self.evaluate_js(f"window['{var_name}'] !== null && window['{var_name}'] !== undefined", False, manage_socket)
+ assert res is not None
+
+ if not "result" in res or not "result" in res["result"] or not "value" in res["result"]["result"]:
+ return False
+
+ return res["result"]["result"]["value"]
+
+ async def close(self, manage_socket: bool = True):
+ try:
+ if manage_socket:
+ await self.open_websocket()
+
+ res = await self._send_devtools_cmd({
+ "method": "Page.close",
+ }, False)
+
+ finally:
+ if manage_socket:
+ await self.close_websocket()
+ return res
+
+ async def enable(self):
+ """
+ Enables page domain notifications.
+ """
+ await self._send_devtools_cmd({
+ "method": "Page.enable",
+ }, False)
+
+ async def disable(self):
+ """
+ Disables page domain notifications.
+ """
+ await self._send_devtools_cmd({
+ "method": "Page.disable",
+ }, False)
+
+ async def refresh(self, manage_socket: bool = True):
+ try:
+ if manage_socket:
+ await self.open_websocket()
+
+ await self._send_devtools_cmd({
+ "method": "Page.reload",
+ }, False)
+
+ finally:
+ if manage_socket:
+ await self.close_websocket()
+
+ return
+ async def reload_and_evaluate(self, js: str, manage_socket: bool = True):
+ """
+ Reloads the current tab, with JS to run on load via debugger
+ """
+ try:
+ if manage_socket:
+ await self.open_websocket()
+
+ await self._send_devtools_cmd({
+ "method": "Debugger.enable"
+ }, True)
+
+ await self._send_devtools_cmd({
+ "method": "Runtime.evaluate",
+ "params": {
+ "expression": "location.reload();",
+ "userGesture": True,
+ "awaitPromise": False
+ }
+ }, False)
+
+ breakpoint_res = await self._send_devtools_cmd({
+ "method": "Debugger.setInstrumentationBreakpoint",
+ "params": {
+ "instrumentation": "beforeScriptExecution"
+ }
+ }, True)
+
+ assert breakpoint_res is not None
+
+ logger.info(breakpoint_res)
+
+ # Page finishes loading when breakpoint hits
+
+ for _ in range(20):
+ # this works around 1/5 of the time, so just send it 8 times.
+ # the js accounts for being injected multiple times allowing only one instance to run at a time anyway
+ await self._send_devtools_cmd({
+ "method": "Runtime.evaluate",
+ "params": {
+ "expression": js,
+ "userGesture": True,
+ "awaitPromise": False
+ }
+ }, False)
+
+ await self._send_devtools_cmd({
+ "method": "Debugger.removeBreakpoint",
+ "params": {
+ "breakpointId": breakpoint_res["result"]["breakpointId"]
+ }
+ }, False)
+
+ for _ in range(4):
+ await self._send_devtools_cmd({
+ "method": "Debugger.resume"
+ }, False)
+
+ await self._send_devtools_cmd({
+ "method": "Debugger.disable"
+ }, True)
+
+ finally:
+ if manage_socket:
+ await self.close_websocket()
+ return
+
+ async def add_script_to_evaluate_on_new_document(self, js: str, add_dom_wrapper: bool = True, manage_socket: bool = True, get_result: bool = True):
+ """
+ How the underlying call functions is not particularly clear from the devtools docs, so stealing puppeteer's description:
+
+ Adds a function which would be invoked in one of the following scenarios:
+ * whenever the page is navigated
+ * whenever the child frame is attached or navigated. In this case, the
+ function is invoked in the context of the newly attached frame.
+
+ The function is invoked after the document was created but before any of
+ its scripts were run. This is useful to amend the JavaScript environment,
+ e.g. to seed `Math.random`.
+
+ Parameters
+ ----------
+ js : str
+ The script to evaluate on new document
+ add_dom_wrapper : bool
+ True to wrap the script in a wait for the 'DOMContentLoaded' event.
+ DOM will usually not exist when this execution happens,
+ so it is necessary to delay til DOM is loaded if you are modifying it
+ manage_socket : bool
+ True to have this function handle opening/closing the websocket for this tab
+ get_result : bool
+ True to wait for the result of this call
+
+ Returns
+ -------
+ int or None
+ The identifier of the script added, used to remove it later.
+ (see remove_script_to_evaluate_on_new_document below)
+ None is returned if `get_result` is False
+ """
+ try:
+
+ wrappedjs = """
+ function scriptFunc() {
+ {js}
+ }
+ if (document.readyState === 'loading') {
+ addEventListener('DOMContentLoaded', () => {
+ scriptFunc();
+ });
+ } else {
+ scriptFunc();
+ }
+ """.format(js=js) if add_dom_wrapper else js
+
+ if manage_socket:
+ await self.open_websocket()
+
+ res = await self._send_devtools_cmd({
+ "method": "Page.addScriptToEvaluateOnNewDocument",
+ "params": {
+ "source": wrappedjs
+ }
+ }, get_result)
+
+ finally:
+ if manage_socket:
+ await self.close_websocket()
+ return res
+
+ async def remove_script_to_evaluate_on_new_document(self, script_id: str, manage_socket: bool = True):
+ """
+ Removes a script from a page that was added with `add_script_to_evaluate_on_new_document`
+
+ Parameters
+ ----------
+ script_id : int
+ The identifier of the script to remove (returned from `add_script_to_evaluate_on_new_document`)
+ """
+
+ try:
+ if manage_socket:
+ await self.open_websocket()
+
+ await self._send_devtools_cmd({
+ "method": "Page.removeScriptToEvaluateOnNewDocument",
+ "params": {
+ "identifier": script_id
+ }
+ }, False)
+
+ finally:
+ if manage_socket:
+ await self.close_websocket()
+
+ async def has_element(self, element_name: str, manage_socket: bool = True):
+ res = await self.evaluate_js(f"document.getElementById('{element_name}') != null", False, manage_socket)
+ assert res is not None
+
+ if not "result" in res or not "result" in res["result"] or not "value" in res["result"]["result"]:
+ return False
+
+ return res["result"]["result"]["value"]
+
+ async def inject_css(self, style: str, manage_socket: bool = True):
+ try:
+ css_id = str(uuid.uuid4())
+
+ result = await self.evaluate_js(
+ f"""
+ (function() {{
+ const style = document.createElement('style');
+ style.id = "{css_id}";
+ document.head.append(style);
+ style.textContent = `{style}`;
+ }})()
+ """, False, manage_socket)
+
+ assert result is not None
+
+ if "exceptionDetails" in result["result"]:
+ return {
+ "success": False,
+ "result": result["result"]
+ }
+
+ return {
+ "success": True,
+ "result": css_id
+ }
+ except Exception as e:
+ return {
+ "success": False,
+ "result": e
+ }
+
+ async def remove_css(self, css_id: str, manage_socket: bool = True):
+ try:
+ result = await self.evaluate_js(
+ f"""
+ (function() {{
+ let style = document.getElementById("{css_id}");
+
+ if (style.nodeName.toLowerCase() == 'style')
+ style.parentNode.removeChild(style);
+ }})()
+ """, False, manage_socket)
+
+ assert result is not None
+
+ if "exceptionDetails" in result["result"]:
+ return {
+ "success": False,
+ "result": result
+ }
+
+ return {
+ "success": True
+ }
+ except Exception as e:
+ return {
+ "success": False,
+ "result": e
+ }
+
+ async def get_steam_resource(self, url: str):
+ res = await self.evaluate_js(f'(async function test() {{ return await (await fetch("{url}")).text() }})()', True)
+ assert res is not None
+ return res["result"]["result"]["value"]
+
+ def __repr__(self):
+ return self.title
+
+
+async def get_tabs() -> List[Tab]:
+ res = {}
+
+ na = False
+ while True:
+ try:
+ async with ClientSession() as web:
+ res = await web.get(f"{BASE_ADDRESS}/json", timeout=3)
+ except ClientConnectorError:
+ if not na:
+ logger.debug("Steam isn't available yet. Wait for a moment...")
+ na = True
+ await sleep(5)
+ except ClientOSError:
+ logger.warn(f"The request to {BASE_ADDRESS}/json was reset")
+ await sleep(1)
+ except TimeoutError:
+ logger.warn(f"The request to {BASE_ADDRESS}/json timed out")
+ await sleep(1)
+ else:
+ break
+
+ if res.status == 200:
+ r = await res.json()
+ return [Tab(i) for i in r]
+ else:
+ raise Exception(f"/json did not return 200. {await res.text()}")
+
+
+async def get_tab(tab_name: str) -> Tab:
+ tabs = await get_tabs()
+ tab = next((i for i in tabs if i.title == tab_name), None)
+ if not tab:
+ raise ValueError(f"Tab {tab_name} not found")
+ return tab
+
+async def get_tab_lambda(test: Callable[[Tab], bool]) -> Tab:
+ tabs = await get_tabs()
+ tab = next((i for i in tabs if test(i)), None)
+ if not tab:
+ raise ValueError(f"Tab not found by lambda")
+ return tab
+
+SHARED_CTX_NAMES = ["SharedJSContext", "Steam Shared Context presented by Valveâ„¢", "Steam", "SP"]
+CLOSEABLE_URLS = ["about:blank", "data:text/html,%3Cbody%3E%3C%2Fbody%3E"] # Closing anything other than these *really* likes to crash Steam
+DO_NOT_CLOSE_URL = "Valve Steam Gamepad/default" # Steam Big Picture Mode tab
+
+def tab_is_gamepadui(t: Tab) -> bool:
+ return "https://steamloopback.host/routes/" in t.url and t.title in SHARED_CTX_NAMES
+
+async def get_gamepadui_tab() -> Tab:
+ tabs = await get_tabs()
+ tab = next((i for i in tabs if tab_is_gamepadui(i)), None)
+ if not tab:
+ raise ValueError(f"GamepadUI Tab not found")
+ return tab
+
+async def inject_to_tab(tab_name: str, js: str, run_async: bool = False):
+ tab = await get_tab(tab_name)
+
+ return await tab.evaluate_js(js, run_async)
+
+async def close_old_tabs():
+ tabs = await get_tabs()
+ for t in tabs:
+ if not t.title or (t.title not in SHARED_CTX_NAMES and any(url in t.url for url in CLOSEABLE_URLS) and DO_NOT_CLOSE_URL not in t.url):
+ logger.debug("Closing tab: " + getattr(t, "title", "Untitled"))
+ await t.close()
+ await sleep(0.5)
diff --git a/backend/decky_loader/loader.py b/backend/decky_loader/loader.py
new file mode 100644
index 00000000..7567912c
--- /dev/null
+++ b/backend/decky_loader/loader.py
@@ -0,0 +1,200 @@
+from __future__ import annotations
+from asyncio import AbstractEventLoop, Queue, sleep
+from json.decoder import JSONDecodeError
+from logging import getLogger
+from os import listdir, path
+from pathlib import Path
+from traceback import print_exc
+from typing import Any, Tuple
+
+from aiohttp import web
+from os.path import exists
+from watchdog.events import RegexMatchingEventHandler, DirCreatedEvent, DirModifiedEvent, FileCreatedEvent, FileModifiedEvent # type: ignore
+from watchdog.observers import Observer # type: ignore
+
+from typing import TYPE_CHECKING
+if TYPE_CHECKING:
+ from .main import PluginManager
+
+from .injector import get_gamepadui_tab
+from .plugin.plugin import PluginWrapper
+
+Plugins = dict[str, PluginWrapper]
+ReloadQueue = Queue[Tuple[str, str, bool | None] | Tuple[str, str]]
+
+#TODO: Remove placeholder method
+async def log_plugin_emitted_message(message: Any):
+ getLogger().debug(f"EMITTED MESSAGE: " + str(message))
+
+class FileChangeHandler(RegexMatchingEventHandler):
+ def __init__(self, queue: ReloadQueue, plugin_path: str) -> None:
+ super().__init__(regexes=[r'^.*?dist\/index\.js$', r'^.*?main\.py$']) # type: ignore
+ self.logger = getLogger("file-watcher")
+ self.plugin_path = plugin_path
+ self.queue = queue
+ self.disabled = True
+
+ def maybe_reload(self, src_path: str):
+ if self.disabled:
+ return
+ plugin_dir = Path(path.relpath(src_path, self.plugin_path)).parts[0]
+ if exists(path.join(self.plugin_path, plugin_dir, "plugin.json")):
+ self.queue.put_nowait((path.join(self.plugin_path, plugin_dir, "main.py"), plugin_dir, True))
+
+ def on_created(self, event: DirCreatedEvent | FileCreatedEvent):
+ src_path = event.src_path
+ if "__pycache__" in src_path:
+ return
+
+ # check to make sure this isn't a directory
+ if path.isdir(src_path):
+ return
+
+ # get the directory name of the plugin so that we can find its "main.py" and reload it; the
+ # file that changed is not necessarily the one that needs to be reloaded
+ self.logger.debug(f"file created: {src_path}")
+ self.maybe_reload(src_path)
+
+ def on_modified(self, event: DirModifiedEvent | FileModifiedEvent):
+ src_path = event.src_path
+ if "__pycache__" in src_path:
+ return
+
+ # check to make sure this isn't a directory
+ if path.isdir(src_path):
+ return
+
+ # get the directory name of the plugin so that we can find its "main.py" and reload it; the
+ # file that changed is not necessarily the one that needs to be reloaded
+ self.logger.debug(f"file modified: {src_path}")
+ self.maybe_reload(src_path)
+
+class Loader:
+ def __init__(self, server_instance: PluginManager, plugin_path: str, loop: AbstractEventLoop, live_reload: bool = False) -> None:
+ self.loop = loop
+ self.logger = getLogger("Loader")
+ self.plugin_path = plugin_path
+ self.logger.info(f"plugin_path: {self.plugin_path}")
+ self.plugins: Plugins = {}
+ self.watcher = None
+ self.live_reload = live_reload
+ self.reload_queue: ReloadQueue = Queue()
+ self.loop.create_task(self.handle_reloads())
+
+ if live_reload:
+ self.observer = Observer()
+ self.watcher = FileChangeHandler(self.reload_queue, plugin_path)
+ self.observer.schedule(self.watcher, self.plugin_path, recursive=True) # type: ignore
+ self.observer.start()
+ self.loop.create_task(self.enable_reload_wait())
+
+ server_instance.web_app.add_routes([
+ web.get("/frontend/{path:.*}", self.handle_frontend_assets),
+ web.get("/locales/{path:.*}", self.handle_frontend_locales),
+ web.get("/plugins", self.get_plugins),
+ web.get("/plugins/{plugin_name}/frontend_bundle", self.handle_frontend_bundle),
+ web.post("/plugins/{plugin_name}/methods/{method_name}", self.handle_plugin_method_call),
+ web.get("/plugins/{plugin_name}/assets/{path:.*}", self.handle_plugin_frontend_assets),
+ web.post("/plugins/{plugin_name}/reload", self.handle_backend_reload_request)
+ ])
+
+ async def enable_reload_wait(self):
+ if self.live_reload:
+ await sleep(10)
+ if self.watcher:
+ self.logger.info("Hot reload enabled")
+ self.watcher.disabled = False
+
+ async def handle_frontend_assets(self, request: web.Request):
+ file = Path(__file__).parents[1].joinpath("static").joinpath(request.match_info["path"])
+ return web.FileResponse(file, headers={"Cache-Control": "no-cache"})
+
+ async def handle_frontend_locales(self, request: web.Request):
+ req_lang = request.match_info["path"]
+ file = Path(__file__).parents[1].joinpath("locales").joinpath(req_lang)
+ if exists(file):
+ return web.FileResponse(file, headers={"Cache-Control": "no-cache", "Content-Type": "application/json"})
+ else:
+ self.logger.info(f"Language {req_lang} not available, returning an empty dictionary")
+ return web.json_response(data={}, headers={"Cache-Control": "no-cache"})
+
+ async def get_plugins(self, request: web.Request):
+ plugins = list(self.plugins.values())
+ return web.json_response([{"name": str(i), "version": i.version} for i in plugins])
+
+ async def handle_plugin_frontend_assets(self, request: web.Request):
+ plugin = self.plugins[request.match_info["plugin_name"]]
+ file = path.join(self.plugin_path, plugin.plugin_directory, "dist/assets", request.match_info["path"])
+
+ return web.FileResponse(file, headers={"Cache-Control": "no-cache"})
+
+ async def handle_frontend_bundle(self, request: web.Request):
+ plugin = self.plugins[request.match_info["plugin_name"]]
+
+ with open(path.join(self.plugin_path, plugin.plugin_directory, "dist/index.js"), "r", encoding="utf-8") as bundle:
+ return web.Response(text=bundle.read(), content_type="application/javascript")
+
+ def import_plugin(self, file: str, plugin_directory: str, refresh: bool | None = False, batch: bool | None = False):
+ try:
+ plugin = PluginWrapper(file, plugin_directory, self.plugin_path)
+ if plugin.name in self.plugins:
+ if not "debug" in plugin.flags and refresh:
+ self.logger.info(f"Plugin {plugin.name} is already loaded and has requested to not be re-loaded")
+ return
+ else:
+ self.plugins[plugin.name].stop()
+ self.plugins.pop(plugin.name, None)
+ if plugin.passive:
+ self.logger.info(f"Plugin {plugin.name} is passive")
+ self.plugins[plugin.name] = plugin.start()
+ self.plugins[plugin.name].set_emitted_message_callback(log_plugin_emitted_message)
+ self.logger.info(f"Loaded {plugin.name}")
+ if not batch:
+ self.loop.create_task(self.dispatch_plugin(plugin.name, plugin.version))
+ except Exception as e:
+ self.logger.error(f"Could not load {file}. {e}")
+ print_exc()
+
+ async def dispatch_plugin(self, name: str, version: str | None):
+ gpui_tab = await get_gamepadui_tab()
+ await gpui_tab.evaluate_js(f"window.importDeckyPlugin('{name}', '{version}')")
+
+ def import_plugins(self):
+ self.logger.info(f"import plugins from {self.plugin_path}")
+
+ directories = [i for i in listdir(self.plugin_path) if path.isdir(path.join(self.plugin_path, i)) and path.isfile(path.join(self.plugin_path, i, "plugin.json"))]
+ for directory in directories:
+ self.logger.info(f"found plugin: {directory}")
+ self.import_plugin(path.join(self.plugin_path, directory, "main.py"), directory, False, True)
+
+ async def handle_reloads(self):
+ while True:
+ args = await self.reload_queue.get()
+ self.import_plugin(*args) # type: ignore
+
+ async def handle_plugin_method_call(self, request: web.Request):
+ res = {}
+ plugin = self.plugins[request.match_info["plugin_name"]]
+ method_name = request.match_info["method_name"]
+ try:
+ method_info = await request.json()
+ args: Any = method_info["args"]
+ except JSONDecodeError:
+ args = {}
+ try:
+ if method_name.startswith("_"):
+ raise RuntimeError("Tried to call private method")
+ res["result"] = await plugin.execute_method(method_name, args)
+ res["success"] = True
+ except Exception as e:
+ res["result"] = str(e)
+ res["success"] = False
+ return web.json_response(res)
+
+ async def handle_backend_reload_request(self, request: web.Request):
+ plugin_name : str = request.match_info["plugin_name"]
+ plugin = self.plugins[plugin_name]
+
+ await self.reload_queue.put((plugin.file, plugin.plugin_directory))
+
+ return web.Response(status=200) \ No newline at end of file
diff --git a/backend/decky_loader/localplatform/localplatform.py b/backend/decky_loader/localplatform/localplatform.py
new file mode 100644
index 00000000..028eff8f
--- /dev/null
+++ b/backend/decky_loader/localplatform/localplatform.py
@@ -0,0 +1,52 @@
+import platform, os
+
+ON_WINDOWS = platform.system() == "Windows"
+ON_LINUX = not ON_WINDOWS
+
+if ON_WINDOWS:
+ from .localplatformwin import *
+ from . import localplatformwin as localplatform
+else:
+ from .localplatformlinux import *
+ from . import localplatformlinux as localplatform
+
+def get_privileged_path() -> str:
+ '''Get path accessible by elevated user. Holds plugins, decky loader and decky loader configs'''
+ return localplatform.get_privileged_path()
+
+def get_unprivileged_path() -> str:
+ '''Get path accessible by non-elevated user. Holds plugin configuration, plugin data and plugin logs. Externally referred to as the 'Homebrew' directory'''
+ return localplatform.get_unprivileged_path()
+
+def get_unprivileged_user() -> str:
+ '''Get user that should own files made in unprivileged path'''
+ return localplatform.get_unprivileged_user()
+
+def get_chown_plugin_path() -> bool:
+ return os.getenv("CHOWN_PLUGIN_PATH", "1") == "1"
+
+def get_server_host() -> str:
+ return os.getenv("SERVER_HOST", "127.0.0.1")
+
+def get_server_port() -> int:
+ return int(os.getenv("SERVER_PORT", "1337"))
+
+def get_live_reload() -> bool:
+ return os.getenv("LIVE_RELOAD", "1") == "1"
+
+def get_keep_systemd_service() -> bool:
+ return os.getenv("KEEP_SYSTEMD_SERVICE", "0") == "1"
+
+def get_log_level() -> int:
+ return {"CRITICAL": 50, "ERROR": 40, "WARNING": 30, "INFO": 20, "DEBUG": 10}[
+ os.getenv("LOG_LEVEL", "INFO")
+ ]
+
+def get_selinux() -> bool:
+ if ON_LINUX:
+ from subprocess import check_output
+ try:
+ if (check_output("getenforce").decode("ascii").strip("\n") == "Enforcing"): return True
+ except FileNotFoundError:
+ pass
+ return False
diff --git a/backend/decky_loader/localplatform/localplatformlinux.py b/backend/decky_loader/localplatform/localplatformlinux.py
new file mode 100644
index 00000000..d5bea6ab
--- /dev/null
+++ b/backend/decky_loader/localplatform/localplatformlinux.py
@@ -0,0 +1,201 @@
+import os, pwd, grp, sys, logging
+from subprocess import call, run, DEVNULL, PIPE, STDOUT
+from ..customtypes import UserType
+
+logger = logging.getLogger("localplatform")
+
+# Get the user id hosting the plugin loader
+def _get_user_id() -> int:
+ return pwd.getpwnam(_get_user()).pw_uid
+
+# Get the user hosting the plugin loader
+def _get_user() -> str:
+ return get_unprivileged_user()
+
+# Get the effective user id of the running process
+def _get_effective_user_id() -> int:
+ return os.geteuid()
+
+# Get the effective user of the running process
+def _get_effective_user() -> str:
+ return pwd.getpwuid(_get_effective_user_id()).pw_name
+
+# Get the effective user group id of the running process
+def _get_effective_user_group_id() -> int:
+ return os.getegid()
+
+# Get the effective user group of the running process
+def _get_effective_user_group() -> str:
+ return grp.getgrgid(_get_effective_user_group_id()).gr_name
+
+# Get the user owner of the given file path.
+def _get_user_owner(file_path: str) -> str:
+ return pwd.getpwuid(os.stat(file_path).st_uid).pw_name
+
+# Get the user group of the given file path, or the user group hosting the plugin loader
+def _get_user_group(file_path: str | None = None) -> str:
+ return grp.getgrgid(os.stat(file_path).st_gid if file_path is not None else _get_user_group_id()).gr_name
+
+# Get the group id of the user hosting the plugin loader
+def _get_user_group_id() -> int:
+ return pwd.getpwuid(_get_user_id()).pw_gid
+
+def chown(path : str, user : UserType = UserType.HOST_USER, recursive : bool = True) -> bool:
+ user_str = ""
+
+ if user == UserType.HOST_USER:
+ user_str = _get_user()+":"+_get_user_group()
+ elif user == UserType.EFFECTIVE_USER:
+ user_str = _get_effective_user()+":"+_get_effective_user_group()
+ elif user == UserType.ROOT:
+ user_str = "root:root"
+ else:
+ raise Exception("Unknown User Type")
+
+ result = call(["chown", "-R", user_str, path] if recursive else ["chown", user_str, path])
+ return result == 0
+
+def chmod(path : str, permissions : int, recursive : bool = True) -> bool:
+ if _get_effective_user_id() != 0:
+ return True
+ result = call(["chmod", "-R", str(permissions), path] if recursive else ["chmod", str(permissions), path])
+ return result == 0
+
+def folder_owner(path : str) -> UserType|None:
+ user_owner = _get_user_owner(path)
+
+ if (user_owner == _get_user()):
+ return UserType.HOST_USER
+
+ elif (user_owner == _get_effective_user()):
+ return UserType.EFFECTIVE_USER
+
+ else:
+ return None
+
+def get_home_path(user : UserType = UserType.HOST_USER) -> str:
+ user_name = "root"
+
+ if user == UserType.HOST_USER:
+ user_name = _get_user()
+ elif user == UserType.EFFECTIVE_USER:
+ user_name = _get_effective_user()
+ elif user == UserType.ROOT:
+ pass
+ else:
+ raise Exception("Unknown User Type")
+
+ return pwd.getpwnam(user_name).pw_dir
+
+def get_username() -> str:
+ return _get_user()
+
+def setgid(user : UserType = UserType.HOST_USER):
+ user_id = 0
+
+ if user == UserType.HOST_USER:
+ user_id = _get_user_group_id()
+ elif user == UserType.ROOT:
+ pass
+ else:
+ raise Exception("Unknown user type")
+
+ os.setgid(user_id)
+
+def setuid(user : UserType = UserType.HOST_USER):
+ user_id = 0
+
+ if user == UserType.HOST_USER:
+ user_id = _get_user_id()
+ elif user == UserType.ROOT:
+ pass
+ else:
+ raise Exception("Unknown user type")
+
+ os.setuid(user_id)
+
+async def service_active(service_name : str) -> bool:
+ res = run(["systemctl", "is-active", service_name], stdout=DEVNULL, stderr=DEVNULL)
+ return res.returncode == 0
+
+async def service_restart(service_name : str) -> bool:
+ call(["systemctl", "daemon-reload"])
+ cmd = ["systemctl", "restart", service_name]
+ res = run(cmd, stdout=PIPE, stderr=STDOUT)
+ return res.returncode == 0
+
+async def service_stop(service_name : str) -> bool:
+ cmd = ["systemctl", "stop", service_name]
+ res = run(cmd, stdout=PIPE, stderr=STDOUT)
+ return res.returncode == 0
+
+async def service_start(service_name : str) -> bool:
+ cmd = ["systemctl", "start", service_name]
+ res = run(cmd, stdout=PIPE, stderr=STDOUT)
+ return res.returncode == 0
+
+def get_privileged_path() -> str:
+ path = os.getenv("PRIVILEGED_PATH")
+
+ if path == None:
+ path = get_unprivileged_path()
+
+ os.makedirs(path, exist_ok=True)
+
+ return path
+
+def _parent_dir(path : str | None) -> str | None:
+ if path == None:
+ return None
+
+ if path.endswith('/'):
+ path = path[:-1]
+
+ return os.path.dirname(path)
+
+def get_unprivileged_path() -> str:
+ path = os.getenv("UNPRIVILEGED_PATH")
+
+ if path == None:
+ path = _parent_dir(os.getenv("PLUGIN_PATH"))
+
+ if path == None:
+ logger.debug("Unprivileged path is not properly configured. Making something up!")
+
+ if hasattr(sys, 'frozen'):
+ # Expected path of loader binary is /home/deck/homebrew/service/PluginLoader
+ path = _parent_dir(_parent_dir(os.path.realpath(sys.argv[0])))
+ else:
+ # Expected path of this file is $src_root/backend/src/localplatformlinux.py
+ path = _parent_dir(_parent_dir(_parent_dir(__file__)))
+
+ if path != None and not os.path.exists(path):
+ path = None
+
+ if path == None:
+ logger.warn("Unprivileged path is not properly configured. Defaulting to /home/deck/homebrew")
+ path = "/home/deck/homebrew" # We give up
+
+ os.makedirs(path, exist_ok=True)
+
+ return path
+
+
+def get_unprivileged_user() -> str:
+ user = os.getenv("UNPRIVILEGED_USER")
+
+ if user == None:
+ # Lets hope we can extract it from the unprivileged dir
+ dir = os.path.realpath(get_unprivileged_path())
+
+ pws = sorted(pwd.getpwall(), reverse=True, key=lambda pw: len(pw.pw_dir))
+ for pw in pws:
+ if dir.startswith(os.path.realpath(pw.pw_dir)):
+ user = pw.pw_name
+ break
+
+ if user == None:
+ logger.warn("Unprivileged user is not properly configured. Defaulting to 'deck'")
+ user = 'deck'
+
+ return user
diff --git a/backend/decky_loader/localplatform/localplatformwin.py b/backend/decky_loader/localplatform/localplatformwin.py
new file mode 100644
index 00000000..f1a5be17
--- /dev/null
+++ b/backend/decky_loader/localplatform/localplatformwin.py
@@ -0,0 +1,55 @@
+from ..customtypes import UserType
+import os, sys
+
+def chown(path : str, user : UserType = UserType.HOST_USER, recursive : bool = True) -> bool:
+ return True # Stubbed
+
+def chmod(path : str, permissions : int, recursive : bool = True) -> bool:
+ return True # Stubbed
+
+def folder_owner(path : str) -> UserType|None:
+ return UserType.HOST_USER # Stubbed
+
+def get_home_path(user : UserType = UserType.HOST_USER) -> str:
+ return os.path.expanduser("~") # Mostly stubbed
+
+def setgid(user : UserType = UserType.HOST_USER):
+ pass # Stubbed
+
+def setuid(user : UserType = UserType.HOST_USER):
+ pass # Stubbed
+
+async def service_active(service_name : str) -> bool:
+ return True # Stubbed
+
+async def service_stop(service_name : str) -> bool:
+ return True # Stubbed
+
+async def service_start(service_name : str) -> bool:
+ return True # Stubbed
+
+async def service_restart(service_name : str) -> bool:
+ if service_name == "plugin_loader":
+ sys.exit(42)
+
+ return True # Stubbed
+
+def get_username() -> str:
+ return os.getlogin()
+
+def get_privileged_path() -> str:
+ '''On windows, privileged_path is equal to unprivileged_path'''
+ return get_unprivileged_path()
+
+def get_unprivileged_path() -> str:
+ path = os.getenv("UNPRIVILEGED_PATH")
+
+ if path == None:
+ path = os.getenv("PRIVILEGED_PATH", os.path.join(os.path.expanduser("~"), "homebrew"))
+
+ os.makedirs(path, exist_ok=True)
+
+ return path
+
+def get_unprivileged_user() -> str:
+ return os.getenv("UNPRIVILEGED_USER", os.getlogin())
diff --git a/backend/decky_loader/localplatform/localsocket.py b/backend/decky_loader/localplatform/localsocket.py
new file mode 100644
index 00000000..93b1ea18
--- /dev/null
+++ b/backend/decky_loader/localplatform/localsocket.py
@@ -0,0 +1,145 @@
+import asyncio, time
+from typing import Any, Callable, Coroutine
+import random
+
+from .localplatform import ON_WINDOWS
+
+BUFFER_LIMIT = 2 ** 20 # 1 MiB
+
+class UnixSocket:
+ def __init__(self, on_new_message: Callable[[str], Coroutine[Any, Any, Any]]):
+ '''
+ on_new_message takes 1 string argument.
+ It's return value gets used, if not None, to write data to the socket.
+ Method should be async
+ '''
+ self.socket_addr = f"/tmp/plugin_socket_{time.time()}"
+ self.on_new_message = on_new_message
+ self.socket = None
+ self.reader = None
+ self.writer = None
+ self.server_writer = None
+
+ async def setup_server(self):
+ self.socket = await asyncio.start_unix_server(self._listen_for_method_call, path=self.socket_addr, limit=BUFFER_LIMIT)
+
+ async def _open_socket_if_not_exists(self):
+ if not self.reader:
+ retries = 0
+ while retries < 10:
+ try:
+ self.reader, self.writer = await asyncio.open_unix_connection(self.socket_addr, limit=BUFFER_LIMIT)
+ return True
+ except:
+ await asyncio.sleep(2)
+ retries += 1
+ return False
+ else:
+ return True
+
+ async def get_socket_connection(self):
+ if not await self._open_socket_if_not_exists():
+ return None, None
+
+ return self.reader, self.writer
+
+ async def close_socket_connection(self):
+ if self.writer != None:
+ self.writer.close()
+
+ self.reader = None
+
+ async def read_single_line(self) -> str|None:
+ reader, _ = await self.get_socket_connection()
+
+ try:
+ assert reader
+ except AssertionError:
+ return
+
+ return await self._read_single_line(reader)
+
+ async def write_single_line(self, message : str):
+ _, writer = await self.get_socket_connection()
+
+ try:
+ assert writer
+ except AssertionError:
+ return
+
+ await self._write_single_line(writer, message)
+
+ async def _read_single_line(self, reader: asyncio.StreamReader) -> str:
+ line = bytearray()
+ while True:
+ try:
+ line.extend(await reader.readuntil())
+ except asyncio.LimitOverrunError:
+ line.extend(await reader.read(reader._limit)) # type: ignore
+ continue
+ except asyncio.IncompleteReadError as err:
+ line.extend(err.partial)
+ break
+ else:
+ break
+
+ return line.decode("utf-8")
+
+ async def _write_single_line(self, writer: asyncio.StreamWriter, message : str):
+ if not message.endswith("\n"):
+ message += "\n"
+
+ writer.write(message.encode("utf-8"))
+ await writer.drain()
+
+ async def write_single_line_server(self, message: str):
+ if self.server_writer is None:
+ return
+ await self._write_single_line(self.server_writer, message)
+
+ async def _listen_for_method_call(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
+ self.server_writer = writer
+ while True:
+
+ def _(task: asyncio.Task[str|None]):
+ res = task.result()
+ if res is not None:
+ asyncio.create_task(self._write_single_line(writer, res))
+
+ line = await self._read_single_line(reader)
+ asyncio.create_task(self.on_new_message(line)).add_done_callback(_)
+
+class PortSocket (UnixSocket):
+ def __init__(self, on_new_message: Callable[[str], Coroutine[Any, Any, Any]]):
+ '''
+ on_new_message takes 1 string argument.
+ It's return value gets used, if not None, to write data to the socket.
+ Method should be async
+ '''
+ super().__init__(on_new_message)
+ self.host = "127.0.0.1"
+ self.port = random.sample(range(40000, 60000), 1)[0]
+
+ async def setup_server(self):
+ self.socket = await asyncio.start_server(self._listen_for_method_call, host=self.host, port=self.port, limit=BUFFER_LIMIT)
+
+ async def _open_socket_if_not_exists(self):
+ if not self.reader:
+ retries = 0
+ while retries < 10:
+ try:
+ self.reader, self.writer = await asyncio.open_connection(host=self.host, port=self.port, limit=BUFFER_LIMIT)
+ return True
+ except:
+ await asyncio.sleep(2)
+ retries += 1
+ return False
+ else:
+ return True
+
+if ON_WINDOWS:
+ class LocalSocket (PortSocket): # type: ignore
+ pass
+else:
+ class LocalSocket (UnixSocket):
+ pass \ No newline at end of file
diff --git a/backend/decky_loader/main.py b/backend/decky_loader/main.py
new file mode 100644
index 00000000..fae30574
--- /dev/null
+++ b/backend/decky_loader/main.py
@@ -0,0 +1,188 @@
+# Change PyInstaller files permissions
+import sys
+from typing import Dict
+from .localplatform.localplatform import (chmod, chown, service_stop, service_start,
+ ON_WINDOWS, get_log_level, get_live_reload,
+ get_server_port, get_server_host, get_chown_plugin_path,
+ get_privileged_path)
+if hasattr(sys, '_MEIPASS'):
+ chmod(sys._MEIPASS, 755) # type: ignore
+# Full imports
+from asyncio import AbstractEventLoop, new_event_loop, set_event_loop, sleep
+from logging import basicConfig, getLogger
+from os import path
+from traceback import format_exc
+import multiprocessing
+
+import aiohttp_cors # type: ignore
+# Partial imports
+from aiohttp import client_exceptions
+from aiohttp.web import Application, Response, Request, get, run_app, static # type: ignore
+from aiohttp_jinja2 import setup as jinja_setup
+
+# local modules
+from .browser import PluginBrowser
+from .helpers import (REMOTE_DEBUGGER_UNIT, csrf_middleware, get_csrf_token, get_loader_version,
+ mkdir_as_user, get_system_pythonpaths, get_effective_user_id)
+
+from .injector import get_gamepadui_tab, Tab, close_old_tabs
+from .loader import Loader
+from .settings import SettingsManager
+from .updater import Updater
+from .utilities import Utilities
+from .customtypes import UserType
+
+
+basicConfig(
+ level=get_log_level(),
+ format="[%(module)s][%(levelname)s]: %(message)s"
+)
+
+logger = getLogger("Main")
+plugin_path = path.join(get_privileged_path(), "plugins")
+
+def chown_plugin_dir():
+ if not path.exists(plugin_path): # For safety, create the folder before attempting to do anything with it
+ mkdir_as_user(plugin_path)
+
+ if not chown(plugin_path, UserType.HOST_USER) or not chmod(plugin_path, 555):
+ logger.error(f"chown/chmod exited with a non-zero exit code")
+
+if get_chown_plugin_path() == True:
+ chown_plugin_dir()
+
+class PluginManager:
+ def __init__(self, loop: AbstractEventLoop) -> None:
+ self.loop = loop
+ self.web_app = Application()
+ self.web_app.middlewares.append(csrf_middleware)
+ self.cors = aiohttp_cors.setup(self.web_app, defaults={
+ "https://steamloopback.host": aiohttp_cors.ResourceOptions(
+ expose_headers="*",
+ allow_headers="*",
+ allow_credentials=True
+ )
+ })
+ self.plugin_loader = Loader(self, plugin_path, self.loop, get_live_reload())
+ self.settings = SettingsManager("loader", path.join(get_privileged_path(), "settings"))
+ self.plugin_browser = PluginBrowser(plugin_path, self.plugin_loader.plugins, self.plugin_loader, self.settings)
+ self.utilities = Utilities(self)
+ self.updater = Updater(self)
+
+ jinja_setup(self.web_app)
+
+ async def startup(_: Application):
+ if self.settings.getSetting("cef_forward", False):
+ self.loop.create_task(service_start(REMOTE_DEBUGGER_UNIT))
+ else:
+ self.loop.create_task(service_stop(REMOTE_DEBUGGER_UNIT))
+ self.loop.create_task(self.loader_reinjector())
+ self.loop.create_task(self.load_plugins())
+
+ self.web_app.on_startup.append(startup)
+
+ self.loop.set_exception_handler(self.exception_handler)
+ self.web_app.add_routes([get("/auth/token", self.get_auth_token)])
+
+ for route in list(self.web_app.router.routes()):
+ self.cors.add(route) # type: ignore
+ self.web_app.add_routes([static("/static", path.join(path.dirname(__file__), '..', 'static'))])
+
+ def exception_handler(self, loop: AbstractEventLoop, context: Dict[str, str]):
+ if context["message"] == "Unclosed connection":
+ return
+ loop.default_exception_handler(context)
+
+ async def get_auth_token(self, request: Request):
+ return Response(text=get_csrf_token())
+
+ async def load_plugins(self):
+ # await self.wait_for_server()
+ logger.debug("Loading plugins")
+ self.plugin_loader.import_plugins()
+ # await inject_to_tab("SP", "window.syncDeckyPlugins();")
+ if self.settings.getSetting("pluginOrder", None) == None:
+ self.settings.setSetting("pluginOrder", list(self.plugin_loader.plugins.keys()))
+ logger.debug("Did not find pluginOrder setting, set it to default")
+
+ async def loader_reinjector(self):
+ while True:
+ tab = None
+ nf = False
+ dc = False
+ while not tab:
+ try:
+ tab = await get_gamepadui_tab()
+ except (client_exceptions.ClientConnectorError, client_exceptions.ServerDisconnectedError):
+ if not dc:
+ logger.debug("Couldn't connect to debugger, waiting...")
+ dc = True
+ pass
+ except ValueError:
+ if not nf:
+ logger.debug("Couldn't find GamepadUI tab, waiting...")
+ nf = True
+ pass
+ if not tab:
+ await sleep(5)
+ await tab.open_websocket()
+ await tab.enable()
+ await self.inject_javascript(tab, True)
+ try:
+ async for msg in tab.listen_for_message():
+ # this gets spammed a lot
+ if msg.get("method", None) != "Page.navigatedWithinDocument":
+ logger.debug("Page event: " + str(msg.get("method", None)))
+ if msg.get("method", None) == "Page.domContentEventFired":
+ if not await tab.has_global_var("deckyHasLoaded", False):
+ await self.inject_javascript(tab)
+ if msg.get("method", None) == "Inspector.detached":
+ logger.info("CEF has requested that we detach.")
+ await tab.close_websocket()
+ break
+ # If this is a forceful disconnect the loop will just stop without any failure message. In this case, injector.py will handle this for us so we don't need to close the socket.
+ # This is because of https://github.com/aio-libs/aiohttp/blob/3ee7091b40a1bc58a8d7846e7878a77640e96996/aiohttp/client_ws.py#L321
+ logger.info("CEF has disconnected...")
+ # At this point the loop starts again and we connect to the freshly started Steam client once it is ready.
+ except Exception:
+ logger.error("Exception while reading page events " + format_exc())
+ await tab.close_websocket()
+ pass
+ # while True:
+ # await sleep(5)
+ # if not await tab.has_global_var("deckyHasLoaded", False):
+ # logger.info("Plugin loader isn't present in Steam anymore, reinjecting...")
+ # await self.inject_javascript(tab)
+
+ async def inject_javascript(self, tab: Tab, first: bool=False, request: Request|None=None):
+ logger.info("Loading Decky frontend!")
+ try:
+ if first:
+ if await tab.has_global_var("deckyHasLoaded", False):
+ await close_old_tabs()
+ await tab.evaluate_js("try{if (window.deckyHasLoaded){setTimeout(() => location.reload(), 100)}else{window.deckyHasLoaded = true;(async()=>{try{while(!window.SP_REACT){await new Promise(r => setTimeout(r, 10))};await import('http://localhost:1337/frontend/index.js?v=%s')}catch(e){console.error(e)};})();}}catch(e){console.error(e)}" % (get_loader_version(), ), False, False, False)
+ except:
+ logger.info("Failed to inject JavaScript into tab\n" + format_exc())
+ pass
+
+ def run(self):
+ return run_app(self.web_app, host=get_server_host(), port=get_server_port(), loop=self.loop, access_log=None)
+
+def main():
+ if ON_WINDOWS:
+ # Fix windows/flask not recognising that .js means 'application/javascript'
+ import mimetypes
+ mimetypes.add_type('application/javascript', '.js')
+
+ # Required for multiprocessing support in frozen files
+ multiprocessing.freeze_support()
+ else:
+ if get_effective_user_id() != 0:
+ logger.warning(f"decky is running as an unprivileged user, this is not officially supported and may cause issues")
+
+ # Append the system and user python paths
+ sys.path.extend(get_system_pythonpaths())
+
+ loop = new_event_loop()
+ set_event_loop(loop)
+ PluginManager(loop).run()
diff --git a/backend/decky_loader/plugin/method_call_request.py b/backend/decky_loader/plugin/method_call_request.py
new file mode 100644
index 00000000..cebe34f8
--- /dev/null
+++ b/backend/decky_loader/plugin/method_call_request.py
@@ -0,0 +1,29 @@
+from typing import Any, TypedDict
+from uuid import uuid4
+from asyncio import Event
+
+class SocketResponseDict(TypedDict):
+ id: str
+ success: bool
+ res: Any
+
+class MethodCallResponse:
+ def __init__(self, success: bool, result: Any) -> None:
+ self.success = success
+ self.result = result
+
+class MethodCallRequest:
+ def __init__(self) -> None:
+ self.id = str(uuid4())
+ self.event = Event()
+ self.response: MethodCallResponse
+
+ def set_result(self, dc: SocketResponseDict):
+ self.response = MethodCallResponse(dc["success"], dc["res"])
+ self.event.set()
+
+ async def wait_for_result(self):
+ await self.event.wait()
+ if not self.response.success:
+ raise Exception(self.response.result)
+ return self.response.result \ No newline at end of file
diff --git a/backend/decky_loader/plugin/plugin.py b/backend/decky_loader/plugin/plugin.py
new file mode 100644
index 00000000..6c338106
--- /dev/null
+++ b/backend/decky_loader/plugin/plugin.py
@@ -0,0 +1,84 @@
+from asyncio import Task, create_task
+from json import dumps, load, loads
+from logging import getLogger
+from os import path
+from multiprocessing import Process
+
+from .sandboxed_plugin import SandboxedPlugin
+from .method_call_request import MethodCallRequest
+from ..localplatform.localsocket import LocalSocket
+
+from typing import Any, Callable, Coroutine, Dict
+
+class PluginWrapper:
+ def __init__(self, file: str, plugin_directory: str, plugin_path: str) -> None:
+ self.file = file
+ self.plugin_path = plugin_path
+ self.plugin_directory = plugin_directory
+
+ self.version = None
+
+ json = load(open(path.join(plugin_path, plugin_directory, "plugin.json"), "r", encoding="utf-8"))
+ if path.isfile(path.join(plugin_path, plugin_directory, "package.json")):
+ package_json = load(open(path.join(plugin_path, plugin_directory, "package.json"), "r", encoding="utf-8"))
+ self.version = package_json["version"]
+
+ self.name = json["name"]
+ self.author = json["author"]
+ self.flags = json["flags"]
+
+ self.passive = not path.isfile(self.file)
+
+ self.log = getLogger("plugin")
+
+ self.sandboxed_plugin = SandboxedPlugin(self.name, self.passive, self.flags, self.file, self.plugin_directory, self.plugin_path, self.version, self.author)
+ #TODO: Maybe make LocalSocket not require on_new_message to make this cleaner
+ self._socket = LocalSocket(self.sandboxed_plugin.on_new_message)
+ self._listener_task: Task[Any]
+ self._method_call_requests: Dict[str, MethodCallRequest] = {}
+
+ self.emitted_message_callback: Callable[[Dict[Any, Any]], Coroutine[Any, Any, Any]]
+
+ def __str__(self) -> str:
+ return self.name
+
+ async def _response_listener(self):
+ while True:
+ try:
+ line = await self._socket.read_single_line()
+ if line != None:
+ res = loads(line)
+ if res["id"] == "0":
+ create_task(self.emitted_message_callback(res["payload"]))
+ else:
+ self._method_call_requests.pop(res["id"]).set_result(res)
+ except:
+ pass
+
+ def set_emitted_message_callback(self, callback: Callable[[Dict[Any, Any]], Coroutine[Any, Any, Any]]):
+ self.emitted_message_callback = callback
+
+ async def execute_method(self, method_name: str, kwargs: Dict[Any, Any]):
+ if self.passive:
+ raise RuntimeError("This plugin is passive (aka does not implement main.py)")
+
+ request = MethodCallRequest()
+ await self._socket.get_socket_connection()
+ await self._socket.write_single_line(dumps({ "method": method_name, "args": kwargs, "id": request.id }, ensure_ascii=False))
+ self._method_call_requests[request.id] = request
+
+ return await request.wait_for_result()
+
+ def start(self):
+ if self.passive:
+ return self
+ Process(target=self.sandboxed_plugin.initialize, args=[self._socket]).start()
+ self._listener_task = create_task(self._response_listener())
+ return self
+
+ def stop(self):
+ self._listener_task.cancel()
+ async def _(self: PluginWrapper):
+ await self._socket.write_single_line(dumps({ "stop": True }, ensure_ascii=False))
+ await self._socket.close_socket_connection()
+ create_task(_(self)) \ No newline at end of file
diff --git a/backend/decky_loader/plugin/sandboxed_plugin.py b/backend/decky_loader/plugin/sandboxed_plugin.py
new file mode 100644
index 00000000..6be97b4a
--- /dev/null
+++ b/backend/decky_loader/plugin/sandboxed_plugin.py
@@ -0,0 +1,138 @@
+from os import path, environ
+from signal import SIGINT, signal
+from importlib.util import module_from_spec, spec_from_file_location
+from json import dumps, loads
+from logging import getLogger
+from sys import exit, path as syspath, modules as sysmodules
+from traceback import format_exc
+from asyncio import (get_event_loop, new_event_loop,
+ set_event_loop, sleep)
+
+from .method_call_request import SocketResponseDict
+from ..localplatform.localsocket import LocalSocket
+from ..localplatform.localplatform import setgid, setuid, get_username, get_home_path
+from ..customtypes import UserType
+from .. import helpers
+
+from typing import Any, Dict, List
+
+class SandboxedPlugin:
+ def __init__(self,
+ name: str,
+ passive: bool,
+ flags: List[str],
+ file: str,
+ plugin_directory: str,
+ plugin_path: str,
+ version: str|None,
+ author: str) -> None:
+ self.name = name
+ self.passive = passive
+ self.flags = flags
+ self.file = file
+ self.plugin_path = plugin_path
+ self.plugin_directory = plugin_directory
+ self.version = version
+ self.author = author
+
+ self.log = getLogger("plugin")
+
+ def initialize(self, socket: LocalSocket):
+ self._socket = socket
+
+ try:
+ signal(SIGINT, lambda s, f: exit(0))
+
+ set_event_loop(new_event_loop())
+ if self.passive:
+ return
+ setgid(UserType.ROOT if "root" in self.flags else UserType.HOST_USER)
+ setuid(UserType.ROOT if "root" in self.flags else UserType.HOST_USER)
+ # export a bunch of environment variables to help plugin developers
+ environ["HOME"] = get_home_path(UserType.ROOT if "root" in self.flags else UserType.HOST_USER)
+ environ["USER"] = "root" if "root" in self.flags else get_username()
+ environ["DECKY_VERSION"] = helpers.get_loader_version()
+ environ["DECKY_USER"] = get_username()
+ environ["DECKY_USER_HOME"] = helpers.get_home_path()
+ environ["DECKY_HOME"] = helpers.get_homebrew_path()
+ environ["DECKY_PLUGIN_SETTINGS_DIR"] = path.join(environ["DECKY_HOME"], "settings", self.plugin_directory)
+ helpers.mkdir_as_user(path.join(environ["DECKY_HOME"], "settings"))
+ helpers.mkdir_as_user(environ["DECKY_PLUGIN_SETTINGS_DIR"])
+ environ["DECKY_PLUGIN_RUNTIME_DIR"] = path.join(environ["DECKY_HOME"], "data", self.plugin_directory)
+ helpers.mkdir_as_user(path.join(environ["DECKY_HOME"], "data"))
+ helpers.mkdir_as_user(environ["DECKY_PLUGIN_RUNTIME_DIR"])
+ environ["DECKY_PLUGIN_LOG_DIR"] = path.join(environ["DECKY_HOME"], "logs", self.plugin_directory)
+ helpers.mkdir_as_user(path.join(environ["DECKY_HOME"], "logs"))
+ helpers.mkdir_as_user(environ["DECKY_PLUGIN_LOG_DIR"])
+ environ["DECKY_PLUGIN_DIR"] = path.join(self.plugin_path, self.plugin_directory)
+ environ["DECKY_PLUGIN_NAME"] = self.name
+ if self.version:
+ environ["DECKY_PLUGIN_VERSION"] = self.version
+ environ["DECKY_PLUGIN_AUTHOR"] = self.author
+
+ # append the plugin's `py_modules` to the recognized python paths
+ syspath.append(path.join(environ["DECKY_PLUGIN_DIR"], "py_modules"))
+
+ #TODO: FIX IN A LESS CURSED WAY
+ keys = [key for key in sysmodules if key.startswith("decky_loader.")]
+ for key in keys:
+ sysmodules[key.replace("decky_loader.", "")] = sysmodules[key]
+
+ spec = spec_from_file_location("_", self.file)
+ assert spec is not None
+ module = module_from_spec(spec)
+ assert spec.loader is not None
+ spec.loader.exec_module(module)
+ self.Plugin = module.Plugin
+
+ setattr(self.Plugin, "emit_message", self.emit_message)
+ #TODO: Find how to put emit_message on global namespace so it doesn't pollute Plugin
+
+ if hasattr(self.Plugin, "_migration"):
+ get_event_loop().run_until_complete(self.Plugin._migration(self.Plugin))
+ if hasattr(self.Plugin, "_main"):
+ get_event_loop().create_task(self.Plugin._main(self.Plugin))
+ get_event_loop().create_task(socket.setup_server())
+ get_event_loop().run_forever()
+ except:
+ self.log.error("Failed to start " + self.name + "!\n" + format_exc())
+ exit(0)
+
+ async def _unload(self):
+ try:
+ self.log.info("Attempting to unload with plugin " + self.name + "'s \"_unload\" function.\n")
+ if hasattr(self.Plugin, "_unload"):
+ await self.Plugin._unload(self.Plugin)
+ self.log.info("Unloaded " + self.name + "\n")
+ else:
+ self.log.info("Could not find \"_unload\" in " + self.name + "'s main.py" + "\n")
+ except:
+ self.log.error("Failed to unload " + self.name + "!\n" + format_exc())
+ exit(0)
+
+ async def on_new_message(self, message : str) -> str|None:
+ data = loads(message)
+
+ if "stop" in data:
+ self.log.info("Calling Loader unload function.")
+ await self._unload()
+ get_event_loop().stop()
+ while get_event_loop().is_running():
+ await sleep(0)
+ get_event_loop().close()
+ raise Exception("Closing message listener")
+
+ d: SocketResponseDict = {"res": None, "success": True, "id": data["id"]}
+ try:
+ d["res"] = await getattr(self.Plugin, data["method"])(self.Plugin, **data["args"])
+ except Exception as e:
+ d["res"] = str(e)
+ d["success"] = False
+ finally:
+ return dumps(d, ensure_ascii=False)
+
+ async def emit_message(self, message: Dict[Any, Any]):
+ await self._socket.write_single_line_server(dumps({
+ "id": "0",
+ "payload": message
+ })) \ No newline at end of file
diff --git a/backend/decky_loader/settings.py b/backend/decky_loader/settings.py
new file mode 100644
index 00000000..c0f2b90c
--- /dev/null
+++ b/backend/decky_loader/settings.py
@@ -0,0 +1,60 @@
+from json import dump, load
+from os import mkdir, path, listdir, rename
+from typing import Any, Dict
+from .localplatform.localplatform import chown, folder_owner, get_chown_plugin_path
+from .customtypes import UserType
+
+from .helpers import get_homebrew_path
+
+
+class SettingsManager:
+ def __init__(self, name: str, settings_directory: str | None = None) -> None:
+ wrong_dir = get_homebrew_path()
+ if settings_directory == None:
+ settings_directory = path.join(wrong_dir, "settings")
+
+ self.path = path.join(settings_directory, name + ".json")
+
+ #Create the folder with the correct permission
+ if not path.exists(settings_directory):
+ mkdir(settings_directory)
+
+ #Copy all old settings file in the root directory to the correct folder
+ for file in listdir(wrong_dir):
+ if file.endswith(".json"):
+ rename(path.join(wrong_dir,file),
+ path.join(settings_directory, file))
+ self.path = path.join(settings_directory, name + ".json")
+
+
+ #If the owner of the settings directory is not the user, then set it as the user:
+ expected_user = UserType.HOST_USER if get_chown_plugin_path() else UserType.ROOT
+ if folder_owner(settings_directory) != expected_user:
+ chown(settings_directory, expected_user, False)
+
+ self.settings: Dict[str, Any] = {}
+
+ try:
+ open(self.path, "x", encoding="utf-8")
+ except FileExistsError as _:
+ self.read()
+ pass
+
+ def read(self):
+ try:
+ with open(self.path, "r", encoding="utf-8") as file:
+ self.settings = load(file)
+ except Exception as e:
+ print(e)
+ pass
+
+ def commit(self):
+ with open(self.path, "w+", encoding="utf-8") as file:
+ dump(self.settings, file, indent=4, ensure_ascii=False)
+
+ def getSetting(self, key: str, default: Any = None) -> Any:
+ return self.settings.get(key, default)
+
+ def setSetting(self, key: str, value: Any) -> Any:
+ self.settings[key] = value
+ self.commit()
diff --git a/backend/decky_loader/updater.py b/backend/decky_loader/updater.py
new file mode 100644
index 00000000..f8aef429
--- /dev/null
+++ b/backend/decky_loader/updater.py
@@ -0,0 +1,238 @@
+from __future__ import annotations
+import os
+import shutil
+from asyncio import sleep
+from json.decoder import JSONDecodeError
+from logging import getLogger
+from os import getcwd, path, remove
+from typing import TYPE_CHECKING, List, TypedDict
+if TYPE_CHECKING:
+ from .main import PluginManager
+from .localplatform.localplatform import chmod, service_restart, ON_LINUX, get_keep_systemd_service, get_selinux
+
+from aiohttp import ClientSession, web
+
+from . import helpers
+from .injector import get_gamepadui_tab
+from .settings import SettingsManager
+
+logger = getLogger("Updater")
+
+class RemoteVerAsset(TypedDict):
+ name: str
+ browser_download_url: str
+class RemoteVer(TypedDict):
+ tag_name: str
+ prerelease: bool
+ assets: List[RemoteVerAsset]
+
+class Updater:
+ def __init__(self, context: PluginManager) -> None:
+ self.context = context
+ self.settings = self.context.settings
+ # Exposes updater methods to frontend
+ self.updater_methods = {
+ "get_branch": self._get_branch,
+ "get_version": self.get_version,
+ "do_update": self.do_update,
+ "do_restart": self.do_restart,
+ "check_for_updates": self.check_for_updates
+ }
+ self.remoteVer: RemoteVer | None = None
+ self.allRemoteVers: List[RemoteVer] = []
+ self.localVer = helpers.get_loader_version()
+
+ try:
+ self.currentBranch = self.get_branch(self.context.settings)
+ except:
+ self.currentBranch = 0
+ logger.error("Current branch could not be determined, defaulting to \"Stable\"")
+
+ if context:
+ context.web_app.add_routes([
+ web.post("/updater/{method_name}", self._handle_server_method_call)
+ ])
+ context.loop.create_task(self.version_reloader())
+
+ async def _handle_server_method_call(self, request: web.Request):
+ method_name = request.match_info["method_name"]
+ try:
+ args = await request.json()
+ except JSONDecodeError:
+ args = {}
+ res = {}
+ try:
+ r = await self.updater_methods[method_name](**args) # type: ignore
+ res["result"] = r
+ res["success"] = True
+ except Exception as e:
+ res["result"] = str(e)
+ res["success"] = False
+ return web.json_response(res)
+
+ def get_branch(self, manager: SettingsManager):
+ ver = manager.getSetting("branch", -1)
+ logger.debug("current branch: %i" % ver)
+ if ver == -1:
+ logger.info("Current branch is not set, determining branch from version...")
+ if self.localVer.startswith("v") and "-pre" in self.localVer:
+ logger.info("Current version determined to be pre-release")
+ manager.setSetting('branch', 1)
+ return 1
+ else:
+ logger.info("Current version determined to be stable")
+ manager.setSetting('branch', 0)
+ return 0
+ return ver
+
+ async def _get_branch(self, manager: SettingsManager):
+ return self.get_branch(manager)
+
+ # retrieve relevant service file's url for each branch
+ def get_service_url(self):
+ logger.debug("Getting service URL")
+ branch = self.get_branch(self.context.settings)
+ match branch:
+ case 0:
+ url = "https://raw.githubusercontent.com/SteamDeckHomebrew/decky-loader/main/dist/plugin_loader-release.service"
+ case 1 | 2:
+ url = "https://raw.githubusercontent.com/SteamDeckHomebrew/decky-loader/main/dist/plugin_loader-prerelease.service"
+ case _:
+ logger.error("You have an invalid branch set... Defaulting to prerelease service, please send the logs to the devs!")
+ url = "https://raw.githubusercontent.com/SteamDeckHomebrew/decky-loader/main/dist/plugin_loader-prerelease.service"
+ return str(url)
+
+ async def get_version(self):
+ return {
+ "current": self.localVer,
+ "remote": self.remoteVer,
+ "all": self.allRemoteVers,
+ "updatable": self.localVer != "unknown"
+ }
+
+ async def check_for_updates(self):
+ logger.debug("checking for updates")
+ selectedBranch = self.get_branch(self.context.settings)
+ async with ClientSession() as web:
+ async with web.request("GET", "https://api.github.com/repos/SteamDeckHomebrew/decky-loader/releases", ssl=helpers.get_ssl_context()) as res:
+ remoteVersions: List[RemoteVer] = await res.json()
+ if selectedBranch == 0:
+ logger.debug("release type: release")
+ remoteVersions = list(filter(lambda ver: ver["tag_name"].startswith("v") and not ver["prerelease"] and not ver["tag_name"].find("-pre") > 0 and ver["tag_name"], remoteVersions))
+ elif selectedBranch == 1:
+ logger.debug("release type: pre-release")
+ remoteVersions = list(filter(lambda ver:ver["tag_name"].startswith("v"), remoteVersions))
+ else:
+ logger.error("release type: NOT FOUND")
+ raise ValueError("no valid branch found")
+ self.allRemoteVers = remoteVersions
+ logger.debug("determining release type to find, branch is %i" % selectedBranch)
+ if selectedBranch == 0:
+ logger.debug("release type: release")
+ self.remoteVer = next(filter(lambda ver: ver["tag_name"].startswith("v") and not ver["prerelease"] and not ver["tag_name"].find("-pre") > 0 and ver["tag_name"], remoteVersions), None)
+ elif selectedBranch == 1:
+ logger.debug("release type: pre-release")
+ self.remoteVer = next(filter(lambda ver:ver["tag_name"].startswith("v"), remoteVersions), None)
+ else:
+ logger.error("release type: NOT FOUND")
+ raise ValueError("no valid branch found")
+ logger.info("Updated remote version information")
+ tab = await get_gamepadui_tab()
+ await tab.evaluate_js(f"window.DeckyPluginLoader.notifyUpdates()", False, True, False)
+ return await self.get_version()
+
+ async def version_reloader(self):
+ await sleep(30)
+ while True:
+ try:
+ await self.check_for_updates()
+ except:
+ pass
+ await sleep(60 * 60 * 6) # 6 hours
+
+ async def do_update(self):
+ logger.debug("Starting update.")
+ try:
+ assert self.remoteVer
+ except AssertionError:
+ logger.error("Unable to update as remoteVer is missing")
+ return
+
+ version = self.remoteVer["tag_name"]
+ download_url = None
+ download_filename = "PluginLoader" if ON_LINUX else "PluginLoader.exe"
+ download_temp_filename = download_filename + ".new"
+
+ for x in self.remoteVer["assets"]:
+ if x["name"] == download_filename:
+ download_url = x["browser_download_url"]
+ break
+
+ if download_url == None:
+ raise Exception("Download url not found")
+
+ service_url = self.get_service_url()
+ logger.debug("Retrieved service URL")
+
+ tab = await get_gamepadui_tab()
+ await tab.open_websocket()
+ async with ClientSession() as web:
+ if ON_LINUX and not get_keep_systemd_service():
+ logger.debug("Downloading systemd service")
+ # download the relevant systemd service depending upon branch
+ async with web.request("GET", service_url, ssl=helpers.get_ssl_context(), allow_redirects=True) as res:
+ logger.debug("Downloading service file")
+ data = await res.content.read()
+ logger.debug(str(data))
+ service_file_path = path.join(getcwd(), "plugin_loader.service")
+ try:
+ with open(path.join(getcwd(), "plugin_loader.service"), "wb") as out:
+ out.write(data)
+ except Exception as e:
+ logger.error(f"Error at %s", exc_info=e)
+ with open(path.join(getcwd(), "plugin_loader.service"), "r", encoding="utf-8") as service_file:
+ service_data = service_file.read()
+ service_data = service_data.replace("${HOMEBREW_FOLDER}", helpers.get_homebrew_path())
+ with open(path.join(getcwd(), "plugin_loader.service"), "w", encoding="utf-8") as service_file:
+ service_file.write(service_data)
+
+ logger.debug("Saved service file")
+ logger.debug("Copying service file over current file.")
+ shutil.copy(service_file_path, "/etc/systemd/system/plugin_loader.service")
+ if not os.path.exists(path.join(getcwd(), ".systemd")):
+ os.mkdir(path.join(getcwd(), ".systemd"))
+ shutil.move(service_file_path, path.join(getcwd(), ".systemd")+"/plugin_loader.service")
+
+ logger.debug("Downloading binary")
+ async with web.request("GET", download_url, ssl=helpers.get_ssl_context(), allow_redirects=True) as res:
+ total = int(res.headers.get('content-length', 0))
+ with open(path.join(getcwd(), download_temp_filename), "wb") as out:
+ progress = 0
+ raw = 0
+ async for c in res.content.iter_chunked(512):
+ out.write(c)
+ raw += len(c)
+ new_progress = round((raw / total) * 100)
+ if progress != new_progress:
+ self.context.loop.create_task(tab.evaluate_js(f"window.DeckyUpdater.updateProgress({new_progress})", False, False, False))
+ progress = new_progress
+
+ with open(path.join(getcwd(), ".loader.version"), "w", encoding="utf-8") as out:
+ out.write(version)
+
+ if ON_LINUX:
+ remove(path.join(getcwd(), download_filename))
+ shutil.move(path.join(getcwd(), download_temp_filename), path.join(getcwd(), download_filename))
+ chmod(path.join(getcwd(), download_filename), 777, False)
+ if get_selinux():
+ from asyncio.subprocess import create_subprocess_exec
+ process = await create_subprocess_exec("chcon", "-t", "bin_t", path.join(getcwd(), download_filename))
+ logger.info(f"Setting the executable flag with chcon returned {await process.wait()}")
+
+ logger.info("Updated loader installation.")
+ await tab.evaluate_js("window.DeckyUpdater.finish()", False, False)
+ await self.do_restart()
+ await tab.close_websocket()
+
+ async def do_restart(self):
+ await service_restart("plugin_loader")
diff --git a/backend/decky_loader/utilities.py b/backend/decky_loader/utilities.py
new file mode 100644
index 00000000..f04ed371
--- /dev/null
+++ b/backend/decky_loader/utilities.py
@@ -0,0 +1,373 @@
+from __future__ import annotations
+from os import stat_result
+import uuid
+from json.decoder import JSONDecodeError
+from os.path import splitext
+import re
+from traceback import format_exc
+from stat import FILE_ATTRIBUTE_HIDDEN # type: ignore
+
+from asyncio import StreamReader, StreamWriter, start_server, gather, open_connection
+from aiohttp import ClientSession, web
+from typing import TYPE_CHECKING, Callable, Coroutine, Dict, Any, List, TypedDict
+
+from logging import getLogger
+from pathlib import Path
+
+from .browser import PluginInstallRequest, PluginInstallType
+if TYPE_CHECKING:
+ from .main import PluginManager
+from .injector import inject_to_tab, get_gamepadui_tab, close_old_tabs, get_tab
+from .localplatform.localplatform import ON_WINDOWS
+from . import helpers
+from .localplatform.localplatform import service_stop, service_start, get_home_path, get_username
+
+class FilePickerObj(TypedDict):
+ file: Path
+ filest: stat_result
+ is_dir: bool
+
+class Utilities:
+ def __init__(self, context: PluginManager) -> None:
+ self.context = context
+ self.util_methods: Dict[str, Callable[..., Coroutine[Any, Any, Any]]] = {
+ "ping": self.ping,
+ "http_request": self.http_request,
+ "install_plugin": self.install_plugin,
+ "install_plugins": self.install_plugins,
+ "cancel_plugin_install": self.cancel_plugin_install,
+ "confirm_plugin_install": self.confirm_plugin_install,
+ "uninstall_plugin": self.uninstall_plugin,
+ "execute_in_tab": self.execute_in_tab,
+ "inject_css_into_tab": self.inject_css_into_tab,
+ "remove_css_from_tab": self.remove_css_from_tab,
+ "allow_remote_debugging": self.allow_remote_debugging,
+ "disallow_remote_debugging": self.disallow_remote_debugging,
+ "set_setting": self.set_setting,
+ "get_setting": self.get_setting,
+ "filepicker_ls": self.filepicker_ls,
+ "disable_rdt": self.disable_rdt,
+ "enable_rdt": self.enable_rdt,
+ "get_tab_id": self.get_tab_id,
+ "get_user_info": self.get_user_info,
+ }
+
+ self.logger = getLogger("Utilities")
+
+ self.rdt_proxy_server = None
+ self.rdt_script_id = None
+ self.rdt_proxy_task = None
+
+ if context:
+ context.web_app.add_routes([
+ web.post("/methods/{method_name}", self._handle_server_method_call)
+ ])
+
+ async def _handle_server_method_call(self, request: web.Request):
+ method_name = request.match_info["method_name"]
+ try:
+ args = await request.json()
+ except JSONDecodeError:
+ args = {}
+ res = {}
+ try:
+ r = await self.util_methods[method_name](**args)
+ res["result"] = r
+ res["success"] = True
+ except Exception as e:
+ res["result"] = str(e)
+ res["success"] = False
+ return web.json_response(res)
+
+ async def install_plugin(self, artifact: str="", name: str="No name", version: str="dev", hash: str="", install_type: PluginInstallType=PluginInstallType.INSTALL):
+ return await self.context.plugin_browser.request_plugin_install(
+ artifact=artifact,
+ name=name,
+ version=version,
+ hash=hash,
+ install_type=install_type
+ )
+
+ async def install_plugins(self, requests: List[PluginInstallRequest]):
+ return await self.context.plugin_browser.request_multiple_plugin_installs(
+ requests=requests
+ )
+
+ async def confirm_plugin_install(self, request_id: str):
+ return await self.context.plugin_browser.confirm_plugin_install(request_id)
+
+ async def cancel_plugin_install(self, request_id: str):
+ return self.context.plugin_browser.cancel_plugin_install(request_id)
+
+ async def uninstall_plugin(self, name: str):
+ return await self.context.plugin_browser.uninstall_plugin(name)
+
+ async def http_request(self, method: str="", url: str="", **kwargs: Any):
+ async with ClientSession() as web:
+ res = await web.request(method, url, ssl=helpers.get_ssl_context(), **kwargs)
+ text = await res.text()
+ return {
+ "status": res.status,
+ "headers": dict(res.headers),
+ "body": text
+ }
+
+ async def ping(self, **kwargs: Any):
+ return "pong"
+
+ async def execute_in_tab(self, tab: str, run_async: bool, code: str):
+ try:
+ result = await inject_to_tab(tab, code, run_async)
+ assert result
+ if "exceptionDetails" in result["result"]:
+ return {
+ "success": False,
+ "result": result["result"]
+ }
+
+ return {
+ "success": True,
+ "result": result["result"]["result"].get("value")
+ }
+ except Exception as e:
+ return {
+ "success": False,
+ "result": e
+ }
+
+ async def inject_css_into_tab(self, tab: str, style: str):
+ try:
+ css_id = str(uuid.uuid4())
+
+ result = await inject_to_tab(tab,
+ f"""
+ (function() {{
+ const style = document.createElement('style');
+ style.id = "{css_id}";
+ document.head.append(style);
+ style.textContent = `{style}`;
+ }})()
+ """, False)
+
+ if result and "exceptionDetails" in result["result"]:
+ return {
+ "success": False,
+ "result": result["result"]
+ }
+
+ return {
+ "success": True,
+ "result": css_id
+ }
+ except Exception as e:
+ return {
+ "success": False,
+ "result": e
+ }
+
+ async def remove_css_from_tab(self, tab: str, css_id: str):
+ try:
+ result = await inject_to_tab(tab,
+ f"""
+ (function() {{
+ let style = document.getElementById("{css_id}");
+
+ if (style.nodeName.toLowerCase() == 'style')
+ style.parentNode.removeChild(style);
+ }})()
+ """, False)
+
+ if result and "exceptionDetails" in result["result"]:
+ return {
+ "success": False,
+ "result": result
+ }
+
+ return {
+ "success": True
+ }
+ except Exception as e:
+ return {
+ "success": False,
+ "result": e
+ }
+
+ async def get_setting(self, key: str, default: Any):
+ return self.context.settings.getSetting(key, default)
+
+ async def set_setting(self, key: str, value: Any):
+ return self.context.settings.setSetting(key, value)
+
+ async def allow_remote_debugging(self):
+ await service_start(helpers.REMOTE_DEBUGGER_UNIT)
+ return True
+
+ async def disallow_remote_debugging(self):
+ await service_stop(helpers.REMOTE_DEBUGGER_UNIT)
+ return True
+
+ async def filepicker_ls(self,
+ path : str | None = None,
+ include_files: bool = True,
+ include_folders: bool = True,
+ include_ext: list[str] = [],
+ include_hidden: bool = False,
+ order_by: str = "name_asc",
+ filter_for: str | None = None,
+ page: int = 1,
+ max: int = 1000):
+
+ if path == None:
+ path = get_home_path()
+
+ path_obj = Path(path).resolve()
+
+ files: List[FilePickerObj] = []
+ folders: List[FilePickerObj] = []
+
+ #Resolving all files/folders in the requested directory
+ for file in path_obj.iterdir():
+ if file.exists():
+ filest = file.stat()
+ is_hidden = file.name.startswith('.')
+ if ON_WINDOWS and not is_hidden:
+ is_hidden = bool(filest.st_file_attributes & FILE_ATTRIBUTE_HIDDEN) # type: ignore
+ if include_folders and file.is_dir():
+ if (is_hidden and include_hidden) or not is_hidden:
+ folders.append({"file": file, "filest": filest, "is_dir": True})
+ elif include_files:
+ # Handle requested extensions if present
+ if len(include_ext) == 0 or 'all_files' in include_ext \
+ or splitext(file.name)[1].lstrip('.') in include_ext:
+ if (is_hidden and include_hidden) or not is_hidden:
+ files.append({"file": file, "filest": filest, "is_dir": False})
+ # Filter logic
+ if filter_for is not None:
+ try:
+ if re.compile(filter_for):
+ files = list(filter(lambda file: re.search(filter_for, file["file"].name) != None, files))
+ except re.error:
+ files = list(filter(lambda file: file["file"].name.find(filter_for) != -1, files))
+
+ # Ordering logic
+ ord_arg = order_by.split("_")
+ ord = ord_arg[0]
+ rev = True if ord_arg[1] == "asc" else False
+ match ord:
+ case 'name':
+ files.sort(key=lambda x: x['file'].name.casefold(), reverse = rev)
+ folders.sort(key=lambda x: x['file'].name.casefold(), reverse = rev)
+ case 'modified':
+ files.sort(key=lambda x: x['filest'].st_mtime, reverse = not rev)
+ folders.sort(key=lambda x: x['filest'].st_mtime, reverse = not rev)
+ case 'created':
+ files.sort(key=lambda x: x['filest'].st_ctime, reverse = not rev)
+ folders.sort(key=lambda x: x['filest'].st_ctime, reverse = not rev)
+ case 'size':
+ files.sort(key=lambda x: x['filest'].st_size, reverse = not rev)
+ # Folders has no file size, order by name instead
+ folders.sort(key=lambda x: x['file'].name.casefold())
+ case _:
+ files.sort(key=lambda x: x['file'].name.casefold(), reverse = rev)
+ folders.sort(key=lambda x: x['file'].name.casefold(), reverse = rev)
+
+ #Constructing the final file list, folders first
+ all = [{
+ "isdir": x['is_dir'],
+ "name": str(x['file'].name),
+ "realpath": str(x['file']),
+ "size": x['filest'].st_size,
+ "modified": x['filest'].st_mtime,
+ "created": x['filest'].st_ctime,
+ } for x in folders + files ]
+
+ return {
+ "realpath": str(path),
+ "files": all[(page-1)*max:(page)*max],
+ "total": len(all),
+ }
+
+
+ # Based on https://stackoverflow.com/a/46422554/13174603
+ def start_rdt_proxy(self, ip: str, port: int):
+ async def pipe(reader: StreamReader, writer: StreamWriter):
+ try:
+ while not reader.at_eof():
+ writer.write(await reader.read(2048))
+ finally:
+ writer.close()
+ async def handle_client(local_reader: StreamReader, local_writer: StreamWriter):
+ try:
+ remote_reader, remote_writer = await open_connection(
+ ip, port)
+ pipe1 = pipe(local_reader, remote_writer)
+ pipe2 = pipe(remote_reader, local_writer)
+ await gather(pipe1, pipe2)
+ finally:
+ local_writer.close()
+
+ self.rdt_proxy_server = start_server(handle_client, "127.0.0.1", port)
+ self.rdt_proxy_task = self.context.loop.create_task(self.rdt_proxy_server)
+
+ def stop_rdt_proxy(self):
+ if self.rdt_proxy_server != None:
+ self.rdt_proxy_server.close()
+ if self.rdt_proxy_task:
+ self.rdt_proxy_task.cancel()
+
+ async def _enable_rdt(self):
+ # TODO un-hardcode port
+ try:
+ self.stop_rdt_proxy()
+ ip = self.context.settings.getSetting("developer.rdt.ip", None)
+
+ if ip != None:
+ self.logger.info("Connecting to React DevTools at " + ip)
+ async with ClientSession() as web:
+ res = await web.request("GET", "http://" + ip + ":8097", ssl=helpers.get_ssl_context())
+ script = """
+ if (!window.deckyHasConnectedRDT) {
+ window.deckyHasConnectedRDT = true;
+ // This fixes the overlay when hovering over an element in RDT
+ Object.defineProperty(window, '__REACT_DEVTOOLS_TARGET_WINDOW__', {
+ enumerable: true,
+ configurable: true,
+ get: function() {
+ return (GamepadNavTree?.m_context?.m_controller || FocusNavController)?.m_ActiveContext?.ActiveWindow || window;
+ }
+ });
+ """ + await res.text() + "\n}"
+ if res.status != 200:
+ self.logger.error("Failed to connect to React DevTools at " + ip)
+ return False
+ self.start_rdt_proxy(ip, 8097)
+ self.logger.info("Connected to React DevTools, loading script")
+ tab = await get_gamepadui_tab()
+ # RDT needs to load before React itself to work.
+ await close_old_tabs()
+ result = await tab.reload_and_evaluate(script)
+ self.logger.info(result)
+
+ except Exception:
+ self.logger.error("Failed to connect to React DevTools")
+ self.logger.error(format_exc())
+
+ async def enable_rdt(self):
+ self.context.loop.create_task(self._enable_rdt())
+
+ async def disable_rdt(self):
+ self.logger.info("Disabling React DevTools")
+ tab = await get_gamepadui_tab()
+ self.rdt_script_id = None
+ await close_old_tabs()
+ await tab.evaluate_js("location.reload();", False, True, False)
+ self.logger.info("React DevTools disabled")
+
+ async def get_user_info(self) -> Dict[str, str]:
+ return {
+ "username": get_username(),
+ "path": get_home_path()
+ }
+
+ async def get_tab_id(self, name: str):
+ return (await get_tab(name)).id