summaryrefslogtreecommitdiff
path: root/backend/src
diff options
context:
space:
mode:
authorK900 <me@0upti.me>2023-11-14 00:40:37 +0300
committerGitHub <noreply@github.com>2023-11-13 23:40:37 +0200
commit5a633fdd8284dd1a2b6f3c95806f033ef4a4becf (patch)
treeb89f3660d3b8918484e6bc153003a84b95207045 /backend/src
parent8ce4a7679e9f0abb67e85822fefe08237ec9d82e (diff)
downloaddecky-loader-5a633fdd8284dd1a2b6f3c95806f033ef4a4becf.tar.gz
decky-loader-5a633fdd8284dd1a2b6f3c95806f033ef4a4becf.zip
* fix: get rid of title view jank on latest beta * Count the number of installs for each plugin (#557) * Bump aiohttp from 3.8.4 to 3.8.5 in /backend (#558) * fix: include Decky version in request for index.js This avoids the If-Modified-Since logic in aiohttp and ensures Steam doesn't cache old JS, even if the timestamps are normalized. * fix: clean up shellcheck warnings in act runner script * fix: gitignore settings/ * fix: ensure state directories exist when running without the installer * feat: determine root directory correctly when running from in-tree * fix: fix typo in CI script * refactor: build a proper Python package with poetry * refactor: move decky_plugin under the poetry structure There's no need to special case it anymore, just treat it like any other Python module. * sandboxed_plugin: better fix, attempt 2 --------- Co-authored-by: AAGaming <aagaming@riseup.net> Co-authored-by: Party Wumpus <48649272+PartyWumpus@users.noreply.github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Diffstat (limited to 'backend/src')
-rw-r--r--backend/src/browser.py275
-rw-r--r--backend/src/customtypes.py6
-rw-r--r--backend/src/helpers.py153
-rw-r--r--backend/src/injector.py438
-rw-r--r--backend/src/loader.py200
-rw-r--r--backend/src/localplatform/localplatform.py52
-rw-r--r--backend/src/localplatform/localplatformlinux.py192
-rw-r--r--backend/src/localplatform/localplatformwin.py53
-rw-r--r--backend/src/localplatform/localsocket.py145
-rw-r--r--backend/src/main.py191
-rw-r--r--backend/src/plugin/method_call_request.py29
-rw-r--r--backend/src/plugin/plugin.py84
-rw-r--r--backend/src/plugin/sandboxed_plugin.py138
-rw-r--r--backend/src/settings.py60
-rw-r--r--backend/src/updater.py238
-rw-r--r--backend/src/utilities.py373
16 files changed, 0 insertions, 2627 deletions
diff --git a/backend/src/browser.py b/backend/src/browser.py
deleted file mode 100644
index 7260db8e..00000000
--- a/backend/src/browser.py
+++ /dev/null
@@ -1,275 +0,0 @@
-# Full imports
-import json
-# import pprint
-# from pprint import pformat
-
-# Partial imports
-from aiohttp import ClientSession
-from asyncio import sleep
-from hashlib import sha256
-from io import BytesIO
-from logging import getLogger
-from os import R_OK, W_OK, path, listdir, access, mkdir
-from shutil import rmtree
-from time import time
-from zipfile import ZipFile
-from enum import IntEnum
-from typing import Dict, List, TypedDict
-
-# Local modules
-from .localplatform.localplatform import chown, chmod
-from .loader import Loader, Plugins
-from .helpers import get_ssl_context, download_remote_binary_to_path
-from .settings import SettingsManager
-from .injector import get_gamepadui_tab
-
-logger = getLogger("Browser")
-
-class PluginInstallType(IntEnum):
- INSTALL = 0
- REINSTALL = 1
- UPDATE = 2
-
-class PluginInstallRequest(TypedDict):
- name: str
- artifact: str
- version: str
- hash: str
- install_type: PluginInstallType
-
-class PluginInstallContext:
- def __init__(self, artifact: str, name: str, version: str, hash: str) -> None:
- self.artifact = artifact
- self.name = name
- self.version = version
- self.hash = hash
-
-class PluginBrowser:
- def __init__(self, plugin_path: str, plugins: Plugins, loader: Loader, settings: SettingsManager) -> None:
- self.plugin_path = plugin_path
- self.plugins = plugins
- self.loader = loader
- self.settings = settings
- self.install_requests: Dict[str, PluginInstallContext | List[PluginInstallContext]] = {}
-
- def _unzip_to_plugin_dir(self, zip: BytesIO, name: str, hash: str):
- zip_hash = sha256(zip.getbuffer()).hexdigest()
- if hash and (zip_hash != hash):
- return False
- zip_file = ZipFile(zip)
- zip_file.extractall(self.plugin_path)
- plugin_folder = self.find_plugin_folder(name)
- assert plugin_folder is not None
- plugin_dir = path.join(self.plugin_path, plugin_folder)
-
- if not chown(plugin_dir) or not chmod(plugin_dir, 555):
- logger.error(f"chown/chmod exited with a non-zero exit code")
- return False
- return True
-
- async def _download_remote_binaries_for_plugin_with_name(self, pluginBasePath: str):
- rv = False
- try:
- packageJsonPath = path.join(pluginBasePath, 'package.json')
- pluginBinPath = path.join(pluginBasePath, 'bin')
-
- if access(packageJsonPath, R_OK):
- with open(packageJsonPath, "r", encoding="utf-8") as f:
- packageJson = json.load(f)
- if "remote_binary" in packageJson and len(packageJson["remote_binary"]) > 0:
- # create bin directory if needed.
- chmod(pluginBasePath, 777)
- if access(pluginBasePath, W_OK):
- if not path.exists(pluginBinPath):
- mkdir(pluginBinPath)
- if not access(pluginBinPath, W_OK):
- chmod(pluginBinPath, 777)
-
- rv = True
- for remoteBinary in packageJson["remote_binary"]:
- # Required Fields. If any Remote Binary is missing these fail the install.
- binName = remoteBinary["name"]
- binURL = remoteBinary["url"]
- binHash = remoteBinary["sha256hash"]
- if not await download_remote_binary_to_path(binURL, binHash, path.join(pluginBinPath, binName)):
- rv = False
- raise Exception(f"Error Downloading Remote Binary {binName}@{binURL} with hash {binHash} to {path.join(pluginBinPath, binName)}")
-
- chown(self.plugin_path)
- chmod(pluginBasePath, 555)
- else:
- rv = True
- logger.debug(f"No Remote Binaries to Download")
-
- except Exception as e:
- rv = False
- logger.debug(str(e))
-
- return rv
-
- """Return the filename (only) for the specified plugin"""
- def find_plugin_folder(self, name: str) -> str | None:
- for folder in listdir(self.plugin_path):
- try:
- with open(path.join(self.plugin_path, folder, 'plugin.json'), "r", encoding="utf-8") as f:
- plugin = json.load(f)
-
- if plugin['name'] == name:
- return folder
- except:
- logger.debug(f"skipping {folder}")
-
- async def uninstall_plugin(self, name: str):
- if self.loader.watcher:
- self.loader.watcher.disabled = True
- tab = await get_gamepadui_tab()
- plugin_folder = self.find_plugin_folder(name)
- assert plugin_folder is not None
- plugin_dir = path.join(self.plugin_path, plugin_folder)
- try:
- logger.info("uninstalling " + name)
- logger.info(" at dir " + plugin_dir)
- logger.debug("calling frontend unload for %s" % str(name))
- res = await tab.evaluate_js(f"DeckyPluginLoader.unloadPlugin('{name}')")
- logger.debug("result of unload from UI: %s", res)
- # plugins_snapshot = self.plugins.copy()
- # snapshot_string = pformat(plugins_snapshot)
- # logger.debug("current plugins: %s", snapshot_string)
- if name in self.plugins:
- logger.debug("Plugin %s was found", name)
- self.plugins[name].stop()
- logger.debug("Plugin %s was stopped", name)
- del self.plugins[name]
- logger.debug("Plugin %s was removed from the dictionary", name)
- self.cleanup_plugin_settings(name)
- logger.debug("removing files %s" % str(name))
- rmtree(plugin_dir)
- except FileNotFoundError:
- logger.warning(f"Plugin {name} not installed, skipping uninstallation")
- except Exception as e:
- logger.error(f"Plugin {name} in {plugin_dir} was not uninstalled")
- logger.error(f"Error at {str(e)}", exc_info=e)
- if self.loader.watcher:
- self.loader.watcher.disabled = False
-
- async def _install(self, artifact: str, name: str, version: str, hash: str):
- # Will be set later in code
- res_zip = None
-
- # Check if plugin is installed
- isInstalled = False
- # Preserve plugin order before removing plugin (uninstall alters the order and removes the plugin from the list)
- current_plugin_order = self.settings.getSetting("pluginOrder")[:]
- if self.loader.watcher:
- self.loader.watcher.disabled = True
- try:
- pluginFolderPath = self.find_plugin_folder(name)
- if pluginFolderPath:
- isInstalled = True
- except:
- logger.error(f"Failed to determine if {name} is already installed, continuing anyway.")
-
- # Check if the file is a local file or a URL
- if artifact.startswith("file://"):
- logger.info(f"Installing {name} from local ZIP file (Version: {version})")
- res_zip = BytesIO(open(artifact[7:], "rb").read())
- else:
- logger.info(f"Installing {name} from URL (Version: {version})")
- async with ClientSession() as client:
- logger.debug(f"Fetching {artifact}")
- res = await client.get(artifact, ssl=get_ssl_context())
- if res.status == 200:
- logger.debug("Got 200. Reading...")
- data = await res.read()
- logger.debug(f"Read {len(data)} bytes")
- res_zip = BytesIO(data)
- else:
- logger.fatal(f"Could not fetch from URL. {await res.text()}")
-
- # Check to make sure we got the file
- if res_zip is None:
- logger.fatal(f"Could not fetch {artifact}")
- return
-
- # If plugin is installed, uninstall it
- if isInstalled:
- try:
- logger.debug("Uninstalling existing plugin...")
- await self.uninstall_plugin(name)
- except:
- logger.error(f"Plugin {name} could not be uninstalled.")
-
- # Install the plugin
- logger.debug("Unzipping...")
- ret = self._unzip_to_plugin_dir(res_zip, name, hash)
- if ret:
- plugin_folder = self.find_plugin_folder(name)
- assert plugin_folder is not None
- plugin_dir = path.join(self.plugin_path, plugin_folder)
- ret = await self._download_remote_binaries_for_plugin_with_name(plugin_dir)
- if ret:
- logger.info(f"Installed {name} (Version: {version})")
- if name in self.loader.plugins:
- self.loader.plugins[name].stop()
- self.loader.plugins.pop(name, None)
- await sleep(1)
- if not isInstalled:
- current_plugin_order = self.settings.getSetting("pluginOrder")
- current_plugin_order.append(name)
- self.settings.setSetting("pluginOrder", current_plugin_order)
- logger.debug("Plugin %s was added to the pluginOrder setting", name)
- self.loader.import_plugin(path.join(plugin_dir, "main.py"), plugin_folder)
- else:
- logger.fatal(f"Failed Downloading Remote Binaries")
- else:
- logger.fatal(f"SHA-256 Mismatch!!!! {name} (Version: {version})")
- if self.loader.watcher:
- self.loader.watcher.disabled = False
-
- async def request_plugin_install(self, artifact: str, name: str, version: str, hash: str, install_type: PluginInstallType):
- request_id = str(time())
- self.install_requests[request_id] = PluginInstallContext(artifact, name, version, hash)
- tab = await get_gamepadui_tab()
- await tab.open_websocket()
- await tab.evaluate_js(f"DeckyPluginLoader.addPluginInstallPrompt('{name}', '{version}', '{request_id}', '{hash}', {install_type})")
-
- async def request_multiple_plugin_installs(self, requests: List[PluginInstallRequest]):
- request_id = str(time())
- self.install_requests[request_id] = [PluginInstallContext(req['artifact'], req['name'], req['version'], req['hash']) for req in requests]
- js_requests_parameter = ','.join([
- f"{{ name: '{req['name']}', version: '{req['version']}', hash: '{req['hash']}', install_type: {req['install_type']}}}" for req in requests
- ])
-
- tab = await get_gamepadui_tab()
- await tab.open_websocket()
- await tab.evaluate_js(f"DeckyPluginLoader.addMultiplePluginsInstallPrompt('{request_id}', [{js_requests_parameter}])")
-
- async def confirm_plugin_install(self, request_id: str):
- requestOrRequests = self.install_requests.pop(request_id)
- if isinstance(requestOrRequests, list):
- [await self._install(req.artifact, req.name, req.version, req.hash) for req in requestOrRequests]
- else:
- await self._install(requestOrRequests.artifact, requestOrRequests.name, requestOrRequests.version, requestOrRequests.hash)
-
- def cancel_plugin_install(self, request_id: str):
- self.install_requests.pop(request_id)
-
- def cleanup_plugin_settings(self, name: str):
- """Removes any settings related to a plugin. Propably called when a plugin is uninstalled.
-
- Args:
- name (string): The name of the plugin
- """
- hidden_plugins = self.settings.getSetting("hiddenPlugins", [])
- if name in hidden_plugins:
- hidden_plugins.remove(name)
- self.settings.setSetting("hiddenPlugins", hidden_plugins)
-
-
- plugin_order = self.settings.getSetting("pluginOrder", [])
-
- if name in plugin_order:
- plugin_order.remove(name)
- self.settings.setSetting("pluginOrder", plugin_order)
-
- logger.debug("Removed any settings for plugin %s", name)
diff --git a/backend/src/customtypes.py b/backend/src/customtypes.py
deleted file mode 100644
index 84ebc235..00000000
--- a/backend/src/customtypes.py
+++ /dev/null
@@ -1,6 +0,0 @@
-from enum import Enum
-
-class UserType(Enum):
- HOST_USER = 1
- EFFECTIVE_USER = 2
- ROOT = 3 \ No newline at end of file
diff --git a/backend/src/helpers.py b/backend/src/helpers.py
deleted file mode 100644
index e3770c63..00000000
--- a/backend/src/helpers.py
+++ /dev/null
@@ -1,153 +0,0 @@
-import re
-import ssl
-import uuid
-import os
-import subprocess
-from hashlib import sha256
-from io import BytesIO
-
-import certifi
-from aiohttp.web import Request, Response, middleware
-from aiohttp.typedefs import Handler
-from aiohttp import ClientSession
-from .localplatform import localplatform
-from .customtypes import UserType
-from logging import getLogger
-
-REMOTE_DEBUGGER_UNIT = "steam-web-debug-portforward.service"
-
-# global vars
-csrf_token = str(uuid.uuid4())
-ssl_ctx = ssl.create_default_context(cafile=certifi.where())
-
-assets_regex = re.compile("^/plugins/.*/assets/.*")
-frontend_regex = re.compile("^/frontend/.*")
-logger = getLogger("Main")
-
-def get_ssl_context():
- return ssl_ctx
-
-def get_csrf_token():
- return csrf_token
-
-@middleware
-async def csrf_middleware(request: Request, handler: Handler):
- if str(request.method) == "OPTIONS" or request.headers.get('Authentication') == csrf_token or str(request.rel_url) == "/auth/token" or str(request.rel_url).startswith("/plugins/load_main/") or str(request.rel_url).startswith("/static/") or str(request.rel_url).startswith("/steam_resource/") or str(request.rel_url).startswith("/frontend/") or assets_regex.match(str(request.rel_url)) or frontend_regex.match(str(request.rel_url)):
- return await handler(request)
- return Response(text='Forbidden', status=403)
-
-# Get the default homebrew path unless a home_path is specified. home_path argument is deprecated
-def get_homebrew_path() -> str:
- return localplatform.get_unprivileged_path()
-
-# Recursively create path and chown as user
-def mkdir_as_user(path: str):
- path = os.path.realpath(path)
- os.makedirs(path, exist_ok=True)
- localplatform.chown(path)
-
-# Fetches the version of loader
-def get_loader_version() -> str:
- try:
- with open(os.path.join(os.getcwd(), ".loader.version"), "r", encoding="utf-8") as version_file:
- return version_file.readline().strip()
- except Exception as e:
- logger.warn(f"Failed to execute get_loader_version(): {str(e)}")
- return "unknown"
-
-# returns the appropriate system python paths
-def get_system_pythonpaths() -> list[str]:
- try:
- # run as normal normal user if on linux to also include user python paths
- proc = subprocess.run(["python3" if localplatform.ON_LINUX else "python", "-c", "import sys; print('\\n'.join(x for x in sys.path if x))"],
- # TODO make this less insane
- capture_output=True, user=localplatform.localplatform._get_user_id() if localplatform.ON_LINUX else None, env={} if localplatform.ON_LINUX else None) # type: ignore
- return [x.strip() for x in proc.stdout.decode().strip().split("\n")]
- except Exception as e:
- logger.warn(f"Failed to execute get_system_pythonpaths(): {str(e)}")
- return []
-
-# Download Remote Binaries to local Plugin
-async def download_remote_binary_to_path(url: str, binHash: str, path: str) -> bool:
- rv = False
- try:
- if os.access(os.path.dirname(path), os.W_OK):
- async with ClientSession() as client:
- res = await client.get(url, ssl=get_ssl_context())
- if res.status == 200:
- data = BytesIO(await res.read())
- remoteHash = sha256(data.getbuffer()).hexdigest()
- if binHash == remoteHash:
- data.seek(0)
- with open(path, 'wb') as f:
- f.write(data.getbuffer())
- rv = True
- else:
- raise Exception(f"Fatal Error: Hash Mismatch for remote binary {path}@{url}")
- else:
- rv = False
- except:
- rv = False
-
- return rv
-
-# Deprecated
-def set_user():
- pass
-
-# Deprecated
-def set_user_group() -> str:
- return get_user_group()
-
-#########
-# Below is legacy code, provided for backwards compatibility. This will break on windows
-#########
-
-# Get the user id hosting the plugin loader
-def get_user_id() -> int:
- return localplatform.localplatform._get_user_id() # pyright: ignore [reportPrivateUsage]
-
-# Get the user hosting the plugin loader
-def get_user() -> str:
- return localplatform.localplatform._get_user() # pyright: ignore [reportPrivateUsage]
-
-# Get the effective user id of the running process
-def get_effective_user_id() -> int:
- return localplatform.localplatform._get_effective_user_id() # pyright: ignore [reportPrivateUsage]
-
-# Get the effective user of the running process
-def get_effective_user() -> str:
- return localplatform.localplatform._get_effective_user() # pyright: ignore [reportPrivateUsage]
-
-# Get the effective user group id of the running process
-def get_effective_user_group_id() -> int:
- return localplatform.localplatform._get_effective_user_group_id() # pyright: ignore [reportPrivateUsage]
-
-# Get the effective user group of the running process
-def get_effective_user_group() -> str:
- return localplatform.localplatform._get_effective_user_group() # pyright: ignore [reportPrivateUsage]
-
-# Get the user owner of the given file path.
-def get_user_owner(file_path: str) -> str:
- return localplatform.localplatform._get_user_owner(file_path) # pyright: ignore [reportPrivateUsage]
-
-# Get the user group of the given file path, or the user group hosting the plugin loader
-def get_user_group(file_path: str | None = None) -> str:
- return localplatform.localplatform._get_user_group(file_path) # pyright: ignore [reportPrivateUsage]
-
-# Get the group id of the user hosting the plugin loader
-def get_user_group_id() -> int:
- return localplatform.localplatform._get_user_group_id() # pyright: ignore [reportPrivateUsage]
-
-# Get the default home path unless a user is specified
-def get_home_path(username: str | None = None) -> str:
- return localplatform.get_home_path(UserType.ROOT if username == "root" else UserType.HOST_USER)
-
-async def is_systemd_unit_active(unit_name: str) -> bool:
- return await localplatform.service_active(unit_name)
-
-async def stop_systemd_unit(unit_name: str) -> bool:
- return await localplatform.service_stop(unit_name)
-
-async def start_systemd_unit(unit_name: str) -> bool:
- return await localplatform.service_start(unit_name)
diff --git a/backend/src/injector.py b/backend/src/injector.py
deleted file mode 100644
index a217f689..00000000
--- a/backend/src/injector.py
+++ /dev/null
@@ -1,438 +0,0 @@
-# Injector code from https://github.com/SteamDeckHomebrew/steamdeck-ui-inject. More info on how it works there.
-
-from asyncio import sleep
-from logging import getLogger
-from typing import Any, Callable, List, TypedDict, Dict
-
-from aiohttp import ClientSession
-from aiohttp.client_exceptions import ClientConnectorError, ClientOSError
-from asyncio.exceptions import TimeoutError
-import uuid
-
-BASE_ADDRESS = "http://localhost:8080"
-
-logger = getLogger("Injector")
-
-class _TabResponse(TypedDict):
- title: str
- id: str
- url: str
- webSocketDebuggerUrl: str
-
-class Tab:
- cmd_id = 0
-
- def __init__(self, res: _TabResponse) -> None:
- self.title: str = res["title"]
- self.id: str = res["id"]
- self.url: str = res["url"]
- self.ws_url: str = res["webSocketDebuggerUrl"]
-
- self.websocket = None
- self.client = None
-
- async def open_websocket(self):
- self.client = ClientSession()
- self.websocket = await self.client.ws_connect(self.ws_url) # type: ignore
-
- async def close_websocket(self):
- if self.websocket:
- await self.websocket.close()
- if self.client:
- await self.client.close()
-
- async def listen_for_message(self):
- if self.websocket:
- async for message in self.websocket:
- data = message.json()
- yield data
- logger.warn(f"The Tab {self.title} socket has been disconnected while listening for messages.")
- await self.close_websocket()
-
- async def _send_devtools_cmd(self, dc: Dict[str, Any], receive: bool = True):
- if self.websocket:
- self.cmd_id += 1
- dc["id"] = self.cmd_id
- await self.websocket.send_json(dc)
- if receive:
- async for msg in self.listen_for_message():
- if "id" in msg and msg["id"] == dc["id"]:
- return msg
- return None
- raise RuntimeError("Websocket not opened")
-
- async def evaluate_js(self, js: str, run_async: bool | None = False, manage_socket: bool | None = True, get_result: bool = True):
- try:
- if manage_socket:
- await self.open_websocket()
-
- res = await self._send_devtools_cmd({
- "method": "Runtime.evaluate",
- "params": {
- "expression": js,
- "userGesture": True,
- "awaitPromise": run_async
- }
- }, get_result)
-
- finally:
- if manage_socket:
- await self.close_websocket()
- return res
-
- async def has_global_var(self, var_name: str, manage_socket: bool = True):
- res = await self.evaluate_js(f"window['{var_name}'] !== null && window['{var_name}'] !== undefined", False, manage_socket)
- assert res is not None
-
- if not "result" in res or not "result" in res["result"] or not "value" in res["result"]["result"]:
- return False
-
- return res["result"]["result"]["value"]
-
- async def close(self, manage_socket: bool = True):
- try:
- if manage_socket:
- await self.open_websocket()
-
- res = await self._send_devtools_cmd({
- "method": "Page.close",
- }, False)
-
- finally:
- if manage_socket:
- await self.close_websocket()
- return res
-
- async def enable(self):
- """
- Enables page domain notifications.
- """
- await self._send_devtools_cmd({
- "method": "Page.enable",
- }, False)
-
- async def disable(self):
- """
- Disables page domain notifications.
- """
- await self._send_devtools_cmd({
- "method": "Page.disable",
- }, False)
-
- async def refresh(self, manage_socket: bool = True):
- try:
- if manage_socket:
- await self.open_websocket()
-
- await self._send_devtools_cmd({
- "method": "Page.reload",
- }, False)
-
- finally:
- if manage_socket:
- await self.close_websocket()
-
- return
- async def reload_and_evaluate(self, js: str, manage_socket: bool = True):
- """
- Reloads the current tab, with JS to run on load via debugger
- """
- try:
- if manage_socket:
- await self.open_websocket()
-
- await self._send_devtools_cmd({
- "method": "Debugger.enable"
- }, True)
-
- await self._send_devtools_cmd({
- "method": "Runtime.evaluate",
- "params": {
- "expression": "location.reload();",
- "userGesture": True,
- "awaitPromise": False
- }
- }, False)
-
- breakpoint_res = await self._send_devtools_cmd({
- "method": "Debugger.setInstrumentationBreakpoint",
- "params": {
- "instrumentation": "beforeScriptExecution"
- }
- }, True)
-
- assert breakpoint_res is not None
-
- logger.info(breakpoint_res)
-
- # Page finishes loading when breakpoint hits
-
- for _ in range(20):
- # this works around 1/5 of the time, so just send it 8 times.
- # the js accounts for being injected multiple times allowing only one instance to run at a time anyway
- await self._send_devtools_cmd({
- "method": "Runtime.evaluate",
- "params": {
- "expression": js,
- "userGesture": True,
- "awaitPromise": False
- }
- }, False)
-
- await self._send_devtools_cmd({
- "method": "Debugger.removeBreakpoint",
- "params": {
- "breakpointId": breakpoint_res["result"]["breakpointId"]
- }
- }, False)
-
- for _ in range(4):
- await self._send_devtools_cmd({
- "method": "Debugger.resume"
- }, False)
-
- await self._send_devtools_cmd({
- "method": "Debugger.disable"
- }, True)
-
- finally:
- if manage_socket:
- await self.close_websocket()
- return
-
- async def add_script_to_evaluate_on_new_document(self, js: str, add_dom_wrapper: bool = True, manage_socket: bool = True, get_result: bool = True):
- """
- How the underlying call functions is not particularly clear from the devtools docs, so stealing puppeteer's description:
-
- Adds a function which would be invoked in one of the following scenarios:
- * whenever the page is navigated
- * whenever the child frame is attached or navigated. In this case, the
- function is invoked in the context of the newly attached frame.
-
- The function is invoked after the document was created but before any of
- its scripts were run. This is useful to amend the JavaScript environment,
- e.g. to seed `Math.random`.
-
- Parameters
- ----------
- js : str
- The script to evaluate on new document
- add_dom_wrapper : bool
- True to wrap the script in a wait for the 'DOMContentLoaded' event.
- DOM will usually not exist when this execution happens,
- so it is necessary to delay til DOM is loaded if you are modifying it
- manage_socket : bool
- True to have this function handle opening/closing the websocket for this tab
- get_result : bool
- True to wait for the result of this call
-
- Returns
- -------
- int or None
- The identifier of the script added, used to remove it later.
- (see remove_script_to_evaluate_on_new_document below)
- None is returned if `get_result` is False
- """
- try:
-
- wrappedjs = """
- function scriptFunc() {
- {js}
- }
- if (document.readyState === 'loading') {
- addEventListener('DOMContentLoaded', () => {
- scriptFunc();
- });
- } else {
- scriptFunc();
- }
- """.format(js=js) if add_dom_wrapper else js
-
- if manage_socket:
- await self.open_websocket()
-
- res = await self._send_devtools_cmd({
- "method": "Page.addScriptToEvaluateOnNewDocument",
- "params": {
- "source": wrappedjs
- }
- }, get_result)
-
- finally:
- if manage_socket:
- await self.close_websocket()
- return res
-
- async def remove_script_to_evaluate_on_new_document(self, script_id: str, manage_socket: bool = True):
- """
- Removes a script from a page that was added with `add_script_to_evaluate_on_new_document`
-
- Parameters
- ----------
- script_id : int
- The identifier of the script to remove (returned from `add_script_to_evaluate_on_new_document`)
- """
-
- try:
- if manage_socket:
- await self.open_websocket()
-
- await self._send_devtools_cmd({
- "method": "Page.removeScriptToEvaluateOnNewDocument",
- "params": {
- "identifier": script_id
- }
- }, False)
-
- finally:
- if manage_socket:
- await self.close_websocket()
-
- async def has_element(self, element_name: str, manage_socket: bool = True):
- res = await self.evaluate_js(f"document.getElementById('{element_name}') != null", False, manage_socket)
- assert res is not None
-
- if not "result" in res or not "result" in res["result"] or not "value" in res["result"]["result"]:
- return False
-
- return res["result"]["result"]["value"]
-
- async def inject_css(self, style: str, manage_socket: bool = True):
- try:
- css_id = str(uuid.uuid4())
-
- result = await self.evaluate_js(
- f"""
- (function() {{
- const style = document.createElement('style');
- style.id = "{css_id}";
- document.head.append(style);
- style.textContent = `{style}`;
- }})()
- """, False, manage_socket)
-
- assert result is not None
-
- if "exceptionDetails" in result["result"]:
- return {
- "success": False,
- "result": result["result"]
- }
-
- return {
- "success": True,
- "result": css_id
- }
- except Exception as e:
- return {
- "success": False,
- "result": e
- }
-
- async def remove_css(self, css_id: str, manage_socket: bool = True):
- try:
- result = await self.evaluate_js(
- f"""
- (function() {{
- let style = document.getElementById("{css_id}");
-
- if (style.nodeName.toLowerCase() == 'style')
- style.parentNode.removeChild(style);
- }})()
- """, False, manage_socket)
-
- assert result is not None
-
- if "exceptionDetails" in result["result"]:
- return {
- "success": False,
- "result": result
- }
-
- return {
- "success": True
- }
- except Exception as e:
- return {
- "success": False,
- "result": e
- }
-
- async def get_steam_resource(self, url: str):
- res = await self.evaluate_js(f'(async function test() {{ return await (await fetch("{url}")).text() }})()', True)
- assert res is not None
- return res["result"]["result"]["value"]
-
- def __repr__(self):
- return self.title
-
-
-async def get_tabs() -> List[Tab]:
- res = {}
-
- na = False
- while True:
- try:
- async with ClientSession() as web:
- res = await web.get(f"{BASE_ADDRESS}/json", timeout=3)
- except ClientConnectorError:
- if not na:
- logger.debug("Steam isn't available yet. Wait for a moment...")
- na = True
- await sleep(5)
- except ClientOSError:
- logger.warn(f"The request to {BASE_ADDRESS}/json was reset")
- await sleep(1)
- except TimeoutError:
- logger.warn(f"The request to {BASE_ADDRESS}/json timed out")
- await sleep(1)
- else:
- break
-
- if res.status == 200:
- r = await res.json()
- return [Tab(i) for i in r]
- else:
- raise Exception(f"/json did not return 200. {await res.text()}")
-
-
-async def get_tab(tab_name: str) -> Tab:
- tabs = await get_tabs()
- tab = next((i for i in tabs if i.title == tab_name), None)
- if not tab:
- raise ValueError(f"Tab {tab_name} not found")
- return tab
-
-async def get_tab_lambda(test: Callable[[Tab], bool]) -> Tab:
- tabs = await get_tabs()
- tab = next((i for i in tabs if test(i)), None)
- if not tab:
- raise ValueError(f"Tab not found by lambda")
- return tab
-
-SHARED_CTX_NAMES = ["SharedJSContext", "Steam Shared Context presented by Valveâ„¢", "Steam", "SP"]
-CLOSEABLE_URLS = ["about:blank", "data:text/html,%3Cbody%3E%3C%2Fbody%3E"] # Closing anything other than these *really* likes to crash Steam
-DO_NOT_CLOSE_URL = "Valve Steam Gamepad/default" # Steam Big Picture Mode tab
-
-def tab_is_gamepadui(t: Tab) -> bool:
- return "https://steamloopback.host/routes/" in t.url and t.title in SHARED_CTX_NAMES
-
-async def get_gamepadui_tab() -> Tab:
- tabs = await get_tabs()
- tab = next((i for i in tabs if tab_is_gamepadui(i)), None)
- if not tab:
- raise ValueError(f"GamepadUI Tab not found")
- return tab
-
-async def inject_to_tab(tab_name: str, js: str, run_async: bool = False):
- tab = await get_tab(tab_name)
-
- return await tab.evaluate_js(js, run_async)
-
-async def close_old_tabs():
- tabs = await get_tabs()
- for t in tabs:
- if not t.title or (t.title not in SHARED_CTX_NAMES and any(url in t.url for url in CLOSEABLE_URLS) and DO_NOT_CLOSE_URL not in t.url):
- logger.debug("Closing tab: " + getattr(t, "title", "Untitled"))
- await t.close()
- await sleep(0.5)
diff --git a/backend/src/loader.py b/backend/src/loader.py
deleted file mode 100644
index 7567912c..00000000
--- a/backend/src/loader.py
+++ /dev/null
@@ -1,200 +0,0 @@
-from __future__ import annotations
-from asyncio import AbstractEventLoop, Queue, sleep
-from json.decoder import JSONDecodeError
-from logging import getLogger
-from os import listdir, path
-from pathlib import Path
-from traceback import print_exc
-from typing import Any, Tuple
-
-from aiohttp import web
-from os.path import exists
-from watchdog.events import RegexMatchingEventHandler, DirCreatedEvent, DirModifiedEvent, FileCreatedEvent, FileModifiedEvent # type: ignore
-from watchdog.observers import Observer # type: ignore
-
-from typing import TYPE_CHECKING
-if TYPE_CHECKING:
- from .main import PluginManager
-
-from .injector import get_gamepadui_tab
-from .plugin.plugin import PluginWrapper
-
-Plugins = dict[str, PluginWrapper]
-ReloadQueue = Queue[Tuple[str, str, bool | None] | Tuple[str, str]]
-
-#TODO: Remove placeholder method
-async def log_plugin_emitted_message(message: Any):
- getLogger().debug(f"EMITTED MESSAGE: " + str(message))
-
-class FileChangeHandler(RegexMatchingEventHandler):
- def __init__(self, queue: ReloadQueue, plugin_path: str) -> None:
- super().__init__(regexes=[r'^.*?dist\/index\.js$', r'^.*?main\.py$']) # type: ignore
- self.logger = getLogger("file-watcher")
- self.plugin_path = plugin_path
- self.queue = queue
- self.disabled = True
-
- def maybe_reload(self, src_path: str):
- if self.disabled:
- return
- plugin_dir = Path(path.relpath(src_path, self.plugin_path)).parts[0]
- if exists(path.join(self.plugin_path, plugin_dir, "plugin.json")):
- self.queue.put_nowait((path.join(self.plugin_path, plugin_dir, "main.py"), plugin_dir, True))
-
- def on_created(self, event: DirCreatedEvent | FileCreatedEvent):
- src_path = event.src_path
- if "__pycache__" in src_path:
- return
-
- # check to make sure this isn't a directory
- if path.isdir(src_path):
- return
-
- # get the directory name of the plugin so that we can find its "main.py" and reload it; the
- # file that changed is not necessarily the one that needs to be reloaded
- self.logger.debug(f"file created: {src_path}")
- self.maybe_reload(src_path)
-
- def on_modified(self, event: DirModifiedEvent | FileModifiedEvent):
- src_path = event.src_path
- if "__pycache__" in src_path:
- return
-
- # check to make sure this isn't a directory
- if path.isdir(src_path):
- return
-
- # get the directory name of the plugin so that we can find its "main.py" and reload it; the
- # file that changed is not necessarily the one that needs to be reloaded
- self.logger.debug(f"file modified: {src_path}")
- self.maybe_reload(src_path)
-
-class Loader:
- def __init__(self, server_instance: PluginManager, plugin_path: str, loop: AbstractEventLoop, live_reload: bool = False) -> None:
- self.loop = loop
- self.logger = getLogger("Loader")
- self.plugin_path = plugin_path
- self.logger.info(f"plugin_path: {self.plugin_path}")
- self.plugins: Plugins = {}
- self.watcher = None
- self.live_reload = live_reload
- self.reload_queue: ReloadQueue = Queue()
- self.loop.create_task(self.handle_reloads())
-
- if live_reload:
- self.observer = Observer()
- self.watcher = FileChangeHandler(self.reload_queue, plugin_path)
- self.observer.schedule(self.watcher, self.plugin_path, recursive=True) # type: ignore
- self.observer.start()
- self.loop.create_task(self.enable_reload_wait())
-
- server_instance.web_app.add_routes([
- web.get("/frontend/{path:.*}", self.handle_frontend_assets),
- web.get("/locales/{path:.*}", self.handle_frontend_locales),
- web.get("/plugins", self.get_plugins),
- web.get("/plugins/{plugin_name}/frontend_bundle", self.handle_frontend_bundle),
- web.post("/plugins/{plugin_name}/methods/{method_name}", self.handle_plugin_method_call),
- web.get("/plugins/{plugin_name}/assets/{path:.*}", self.handle_plugin_frontend_assets),
- web.post("/plugins/{plugin_name}/reload", self.handle_backend_reload_request)
- ])
-
- async def enable_reload_wait(self):
- if self.live_reload:
- await sleep(10)
- if self.watcher:
- self.logger.info("Hot reload enabled")
- self.watcher.disabled = False
-
- async def handle_frontend_assets(self, request: web.Request):
- file = Path(__file__).parents[1].joinpath("static").joinpath(request.match_info["path"])
- return web.FileResponse(file, headers={"Cache-Control": "no-cache"})
-
- async def handle_frontend_locales(self, request: web.Request):
- req_lang = request.match_info["path"]
- file = Path(__file__).parents[1].joinpath("locales").joinpath(req_lang)
- if exists(file):
- return web.FileResponse(file, headers={"Cache-Control": "no-cache", "Content-Type": "application/json"})
- else:
- self.logger.info(f"Language {req_lang} not available, returning an empty dictionary")
- return web.json_response(data={}, headers={"Cache-Control": "no-cache"})
-
- async def get_plugins(self, request: web.Request):
- plugins = list(self.plugins.values())
- return web.json_response([{"name": str(i), "version": i.version} for i in plugins])
-
- async def handle_plugin_frontend_assets(self, request: web.Request):
- plugin = self.plugins[request.match_info["plugin_name"]]
- file = path.join(self.plugin_path, plugin.plugin_directory, "dist/assets", request.match_info["path"])
-
- return web.FileResponse(file, headers={"Cache-Control": "no-cache"})
-
- async def handle_frontend_bundle(self, request: web.Request):
- plugin = self.plugins[request.match_info["plugin_name"]]
-
- with open(path.join(self.plugin_path, plugin.plugin_directory, "dist/index.js"), "r", encoding="utf-8") as bundle:
- return web.Response(text=bundle.read(), content_type="application/javascript")
-
- def import_plugin(self, file: str, plugin_directory: str, refresh: bool | None = False, batch: bool | None = False):
- try:
- plugin = PluginWrapper(file, plugin_directory, self.plugin_path)
- if plugin.name in self.plugins:
- if not "debug" in plugin.flags and refresh:
- self.logger.info(f"Plugin {plugin.name} is already loaded and has requested to not be re-loaded")
- return
- else:
- self.plugins[plugin.name].stop()
- self.plugins.pop(plugin.name, None)
- if plugin.passive:
- self.logger.info(f"Plugin {plugin.name} is passive")
- self.plugins[plugin.name] = plugin.start()
- self.plugins[plugin.name].set_emitted_message_callback(log_plugin_emitted_message)
- self.logger.info(f"Loaded {plugin.name}")
- if not batch:
- self.loop.create_task(self.dispatch_plugin(plugin.name, plugin.version))
- except Exception as e:
- self.logger.error(f"Could not load {file}. {e}")
- print_exc()
-
- async def dispatch_plugin(self, name: str, version: str | None):
- gpui_tab = await get_gamepadui_tab()
- await gpui_tab.evaluate_js(f"window.importDeckyPlugin('{name}', '{version}')")
-
- def import_plugins(self):
- self.logger.info(f"import plugins from {self.plugin_path}")
-
- directories = [i for i in listdir(self.plugin_path) if path.isdir(path.join(self.plugin_path, i)) and path.isfile(path.join(self.plugin_path, i, "plugin.json"))]
- for directory in directories:
- self.logger.info(f"found plugin: {directory}")
- self.import_plugin(path.join(self.plugin_path, directory, "main.py"), directory, False, True)
-
- async def handle_reloads(self):
- while True:
- args = await self.reload_queue.get()
- self.import_plugin(*args) # type: ignore
-
- async def handle_plugin_method_call(self, request: web.Request):
- res = {}
- plugin = self.plugins[request.match_info["plugin_name"]]
- method_name = request.match_info["method_name"]
- try:
- method_info = await request.json()
- args: Any = method_info["args"]
- except JSONDecodeError:
- args = {}
- try:
- if method_name.startswith("_"):
- raise RuntimeError("Tried to call private method")
- res["result"] = await plugin.execute_method(method_name, args)
- res["success"] = True
- except Exception as e:
- res["result"] = str(e)
- res["success"] = False
- return web.json_response(res)
-
- async def handle_backend_reload_request(self, request: web.Request):
- plugin_name : str = request.match_info["plugin_name"]
- plugin = self.plugins[plugin_name]
-
- await self.reload_queue.put((plugin.file, plugin.plugin_directory))
-
- return web.Response(status=200) \ No newline at end of file
diff --git a/backend/src/localplatform/localplatform.py b/backend/src/localplatform/localplatform.py
deleted file mode 100644
index 028eff8f..00000000
--- a/backend/src/localplatform/localplatform.py
+++ /dev/null
@@ -1,52 +0,0 @@
-import platform, os
-
-ON_WINDOWS = platform.system() == "Windows"
-ON_LINUX = not ON_WINDOWS
-
-if ON_WINDOWS:
- from .localplatformwin import *
- from . import localplatformwin as localplatform
-else:
- from .localplatformlinux import *
- from . import localplatformlinux as localplatform
-
-def get_privileged_path() -> str:
- '''Get path accessible by elevated user. Holds plugins, decky loader and decky loader configs'''
- return localplatform.get_privileged_path()
-
-def get_unprivileged_path() -> str:
- '''Get path accessible by non-elevated user. Holds plugin configuration, plugin data and plugin logs. Externally referred to as the 'Homebrew' directory'''
- return localplatform.get_unprivileged_path()
-
-def get_unprivileged_user() -> str:
- '''Get user that should own files made in unprivileged path'''
- return localplatform.get_unprivileged_user()
-
-def get_chown_plugin_path() -> bool:
- return os.getenv("CHOWN_PLUGIN_PATH", "1") == "1"
-
-def get_server_host() -> str:
- return os.getenv("SERVER_HOST", "127.0.0.1")
-
-def get_server_port() -> int:
- return int(os.getenv("SERVER_PORT", "1337"))
-
-def get_live_reload() -> bool:
- return os.getenv("LIVE_RELOAD", "1") == "1"
-
-def get_keep_systemd_service() -> bool:
- return os.getenv("KEEP_SYSTEMD_SERVICE", "0") == "1"
-
-def get_log_level() -> int:
- return {"CRITICAL": 50, "ERROR": 40, "WARNING": 30, "INFO": 20, "DEBUG": 10}[
- os.getenv("LOG_LEVEL", "INFO")
- ]
-
-def get_selinux() -> bool:
- if ON_LINUX:
- from subprocess import check_output
- try:
- if (check_output("getenforce").decode("ascii").strip("\n") == "Enforcing"): return True
- except FileNotFoundError:
- pass
- return False
diff --git a/backend/src/localplatform/localplatformlinux.py b/backend/src/localplatform/localplatformlinux.py
deleted file mode 100644
index 1ec3fc1a..00000000
--- a/backend/src/localplatform/localplatformlinux.py
+++ /dev/null
@@ -1,192 +0,0 @@
-import os, pwd, grp, sys, logging
-from subprocess import call, run, DEVNULL, PIPE, STDOUT
-from ..customtypes import UserType
-
-logger = logging.getLogger("localplatform")
-
-# Get the user id hosting the plugin loader
-def _get_user_id() -> int:
- return pwd.getpwnam(_get_user()).pw_uid
-
-# Get the user hosting the plugin loader
-def _get_user() -> str:
- return get_unprivileged_user()
-
-# Get the effective user id of the running process
-def _get_effective_user_id() -> int:
- return os.geteuid()
-
-# Get the effective user of the running process
-def _get_effective_user() -> str:
- return pwd.getpwuid(_get_effective_user_id()).pw_name
-
-# Get the effective user group id of the running process
-def _get_effective_user_group_id() -> int:
- return os.getegid()
-
-# Get the effective user group of the running process
-def _get_effective_user_group() -> str:
- return grp.getgrgid(_get_effective_user_group_id()).gr_name
-
-# Get the user owner of the given file path.
-def _get_user_owner(file_path: str) -> str:
- return pwd.getpwuid(os.stat(file_path).st_uid).pw_name
-
-# Get the user group of the given file path, or the user group hosting the plugin loader
-def _get_user_group(file_path: str | None = None) -> str:
- return grp.getgrgid(os.stat(file_path).st_gid if file_path is not None else _get_user_group_id()).gr_name
-
-# Get the group id of the user hosting the plugin loader
-def _get_user_group_id() -> int:
- return pwd.getpwuid(_get_user_id()).pw_gid
-
-def chown(path : str, user : UserType = UserType.HOST_USER, recursive : bool = True) -> bool:
- user_str = ""
-
- if user == UserType.HOST_USER:
- user_str = _get_user()+":"+_get_user_group()
- elif user == UserType.EFFECTIVE_USER:
- user_str = _get_effective_user()+":"+_get_effective_user_group()
- elif user == UserType.ROOT:
- user_str = "root:root"
- else:
- raise Exception("Unknown User Type")
-
- result = call(["chown", "-R", user_str, path] if recursive else ["chown", user_str, path])
- return result == 0
-
-def chmod(path : str, permissions : int, recursive : bool = True) -> bool:
- if _get_effective_user_id() != 0:
- return True
- result = call(["chmod", "-R", str(permissions), path] if recursive else ["chmod", str(permissions), path])
- return result == 0
-
-def folder_owner(path : str) -> UserType|None:
- user_owner = _get_user_owner(path)
-
- if (user_owner == _get_user()):
- return UserType.HOST_USER
-
- elif (user_owner == _get_effective_user()):
- return UserType.EFFECTIVE_USER
-
- else:
- return None
-
-def get_home_path(user : UserType = UserType.HOST_USER) -> str:
- user_name = "root"
-
- if user == UserType.HOST_USER:
- user_name = _get_user()
- elif user == UserType.EFFECTIVE_USER:
- user_name = _get_effective_user()
- elif user == UserType.ROOT:
- pass
- else:
- raise Exception("Unknown User Type")
-
- return pwd.getpwnam(user_name).pw_dir
-
-def get_username() -> str:
- return _get_user()
-
-def setgid(user : UserType = UserType.HOST_USER):
- user_id = 0
-
- if user == UserType.HOST_USER:
- user_id = _get_user_group_id()
- elif user == UserType.ROOT:
- pass
- else:
- raise Exception("Unknown user type")
-
- os.setgid(user_id)
-
-def setuid(user : UserType = UserType.HOST_USER):
- user_id = 0
-
- if user == UserType.HOST_USER:
- user_id = _get_user_id()
- elif user == UserType.ROOT:
- pass
- else:
- raise Exception("Unknown user type")
-
- os.setuid(user_id)
-
-async def service_active(service_name : str) -> bool:
- res = run(["systemctl", "is-active", service_name], stdout=DEVNULL, stderr=DEVNULL)
- return res.returncode == 0
-
-async def service_restart(service_name : str) -> bool:
- call(["systemctl", "daemon-reload"])
- cmd = ["systemctl", "restart", service_name]
- res = run(cmd, stdout=PIPE, stderr=STDOUT)
- return res.returncode == 0
-
-async def service_stop(service_name : str) -> bool:
- cmd = ["systemctl", "stop", service_name]
- res = run(cmd, stdout=PIPE, stderr=STDOUT)
- return res.returncode == 0
-
-async def service_start(service_name : str) -> bool:
- cmd = ["systemctl", "start", service_name]
- res = run(cmd, stdout=PIPE, stderr=STDOUT)
- return res.returncode == 0
-
-def get_privileged_path() -> str:
- path = os.getenv("PRIVILEGED_PATH")
-
- if path == None:
- path = get_unprivileged_path()
-
- return path
-
-def _parent_dir(path : str | None) -> str | None:
- if path == None:
- return None
-
- if path.endswith('/'):
- path = path[:-1]
-
- return os.path.dirname(path)
-
-def get_unprivileged_path() -> str:
- path = os.getenv("UNPRIVILEGED_PATH")
-
- if path == None:
- path = _parent_dir(os.getenv("PLUGIN_PATH"))
-
- if path == None:
- logger.debug("Unprivileged path is not properly configured. Making something up!")
- # Expected path of loader binary is /home/deck/homebrew/service/PluginLoader
- path = _parent_dir(_parent_dir(os.path.realpath(sys.argv[0])))
-
- if path != None and not os.path.exists(path):
- path = None
-
- if path == None:
- logger.warn("Unprivileged path is not properly configured. Defaulting to /home/deck/homebrew")
- path = "/home/deck/homebrew" # We give up
-
- return path
-
-
-def get_unprivileged_user() -> str:
- user = os.getenv("UNPRIVILEGED_USER")
-
- if user == None:
- # Lets hope we can extract it from the unprivileged dir
- dir = os.path.realpath(get_unprivileged_path())
-
- pws = sorted(pwd.getpwall(), reverse=True, key=lambda pw: len(pw.pw_dir))
- for pw in pws:
- if dir.startswith(os.path.realpath(pw.pw_dir)):
- user = pw.pw_name
- break
-
- if user == None:
- logger.warn("Unprivileged user is not properly configured. Defaulting to 'deck'")
- user = 'deck'
-
- return user
diff --git a/backend/src/localplatform/localplatformwin.py b/backend/src/localplatform/localplatformwin.py
deleted file mode 100644
index 212ff2fe..00000000
--- a/backend/src/localplatform/localplatformwin.py
+++ /dev/null
@@ -1,53 +0,0 @@
-from ..customtypes import UserType
-import os, sys
-
-def chown(path : str, user : UserType = UserType.HOST_USER, recursive : bool = True) -> bool:
- return True # Stubbed
-
-def chmod(path : str, permissions : int, recursive : bool = True) -> bool:
- return True # Stubbed
-
-def folder_owner(path : str) -> UserType|None:
- return UserType.HOST_USER # Stubbed
-
-def get_home_path(user : UserType = UserType.HOST_USER) -> str:
- return os.path.expanduser("~") # Mostly stubbed
-
-def setgid(user : UserType = UserType.HOST_USER):
- pass # Stubbed
-
-def setuid(user : UserType = UserType.HOST_USER):
- pass # Stubbed
-
-async def service_active(service_name : str) -> bool:
- return True # Stubbed
-
-async def service_stop(service_name : str) -> bool:
- return True # Stubbed
-
-async def service_start(service_name : str) -> bool:
- return True # Stubbed
-
-async def service_restart(service_name : str) -> bool:
- if service_name == "plugin_loader":
- sys.exit(42)
-
- return True # Stubbed
-
-def get_username() -> str:
- return os.getlogin()
-
-def get_privileged_path() -> str:
- '''On windows, privileged_path is equal to unprivileged_path'''
- return get_unprivileged_path()
-
-def get_unprivileged_path() -> str:
- path = os.getenv("UNPRIVILEGED_PATH")
-
- if path == None:
- path = os.getenv("PRIVILEGED_PATH", os.path.join(os.path.expanduser("~"), "homebrew"))
-
- return path
-
-def get_unprivileged_user() -> str:
- return os.getenv("UNPRIVILEGED_USER", os.getlogin())
diff --git a/backend/src/localplatform/localsocket.py b/backend/src/localplatform/localsocket.py
deleted file mode 100644
index 93b1ea18..00000000
--- a/backend/src/localplatform/localsocket.py
+++ /dev/null
@@ -1,145 +0,0 @@
-import asyncio, time
-from typing import Any, Callable, Coroutine
-import random
-
-from .localplatform import ON_WINDOWS
-
-BUFFER_LIMIT = 2 ** 20 # 1 MiB
-
-class UnixSocket:
- def __init__(self, on_new_message: Callable[[str], Coroutine[Any, Any, Any]]):
- '''
- on_new_message takes 1 string argument.
- It's return value gets used, if not None, to write data to the socket.
- Method should be async
- '''
- self.socket_addr = f"/tmp/plugin_socket_{time.time()}"
- self.on_new_message = on_new_message
- self.socket = None
- self.reader = None
- self.writer = None
- self.server_writer = None
-
- async def setup_server(self):
- self.socket = await asyncio.start_unix_server(self._listen_for_method_call, path=self.socket_addr, limit=BUFFER_LIMIT)
-
- async def _open_socket_if_not_exists(self):
- if not self.reader:
- retries = 0
- while retries < 10:
- try:
- self.reader, self.writer = await asyncio.open_unix_connection(self.socket_addr, limit=BUFFER_LIMIT)
- return True
- except:
- await asyncio.sleep(2)
- retries += 1
- return False
- else:
- return True
-
- async def get_socket_connection(self):
- if not await self._open_socket_if_not_exists():
- return None, None
-
- return self.reader, self.writer
-
- async def close_socket_connection(self):
- if self.writer != None:
- self.writer.close()
-
- self.reader = None
-
- async def read_single_line(self) -> str|None:
- reader, _ = await self.get_socket_connection()
-
- try:
- assert reader
- except AssertionError:
- return
-
- return await self._read_single_line(reader)
-
- async def write_single_line(self, message : str):
- _, writer = await self.get_socket_connection()
-
- try:
- assert writer
- except AssertionError:
- return
-
- await self._write_single_line(writer, message)
-
- async def _read_single_line(self, reader: asyncio.StreamReader) -> str:
- line = bytearray()
- while True:
- try:
- line.extend(await reader.readuntil())
- except asyncio.LimitOverrunError:
- line.extend(await reader.read(reader._limit)) # type: ignore
- continue
- except asyncio.IncompleteReadError as err:
- line.extend(err.partial)
- break
- else:
- break
-
- return line.decode("utf-8")
-
- async def _write_single_line(self, writer: asyncio.StreamWriter, message : str):
- if not message.endswith("\n"):
- message += "\n"
-
- writer.write(message.encode("utf-8"))
- await writer.drain()
-
- async def write_single_line_server(self, message: str):
- if self.server_writer is None:
- return
- await self._write_single_line(self.server_writer, message)
-
- async def _listen_for_method_call(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
- self.server_writer = writer
- while True:
-
- def _(task: asyncio.Task[str|None]):
- res = task.result()
- if res is not None:
- asyncio.create_task(self._write_single_line(writer, res))
-
- line = await self._read_single_line(reader)
- asyncio.create_task(self.on_new_message(line)).add_done_callback(_)
-
-class PortSocket (UnixSocket):
- def __init__(self, on_new_message: Callable[[str], Coroutine[Any, Any, Any]]):
- '''
- on_new_message takes 1 string argument.
- It's return value gets used, if not None, to write data to the socket.
- Method should be async
- '''
- super().__init__(on_new_message)
- self.host = "127.0.0.1"
- self.port = random.sample(range(40000, 60000), 1)[0]
-
- async def setup_server(self):
- self.socket = await asyncio.start_server(self._listen_for_method_call, host=self.host, port=self.port, limit=BUFFER_LIMIT)
-
- async def _open_socket_if_not_exists(self):
- if not self.reader:
- retries = 0
- while retries < 10:
- try:
- self.reader, self.writer = await asyncio.open_connection(host=self.host, port=self.port, limit=BUFFER_LIMIT)
- return True
- except:
- await asyncio.sleep(2)
- retries += 1
- return False
- else:
- return True
-
-if ON_WINDOWS:
- class LocalSocket (PortSocket): # type: ignore
- pass
-else:
- class LocalSocket (UnixSocket):
- pass \ No newline at end of file
diff --git a/backend/src/main.py b/backend/src/main.py
deleted file mode 100644
index 86c4720d..00000000
--- a/backend/src/main.py
+++ /dev/null
@@ -1,191 +0,0 @@
-# Change PyInstaller files permissions
-import sys
-from typing import Dict
-from .localplatform.localplatform import (chmod, chown, service_stop, service_start,
- ON_WINDOWS, get_log_level, get_live_reload,
- get_server_port, get_server_host, get_chown_plugin_path,
- get_privileged_path)
-if hasattr(sys, '_MEIPASS'):
- chmod(sys._MEIPASS, 755) # type: ignore
-# Full imports
-from asyncio import AbstractEventLoop, new_event_loop, set_event_loop, sleep
-from logging import basicConfig, getLogger
-from os import path
-from traceback import format_exc
-import multiprocessing
-
-import aiohttp_cors # type: ignore
-# Partial imports
-from aiohttp import client_exceptions
-from aiohttp.web import Application, Response, Request, get, run_app, static # type: ignore
-from aiohttp_jinja2 import setup as jinja_setup
-
-# local modules
-from .browser import PluginBrowser
-from .helpers import (REMOTE_DEBUGGER_UNIT, csrf_middleware, get_csrf_token,
- mkdir_as_user, get_system_pythonpaths, get_effective_user_id)
-
-from .injector import get_gamepadui_tab, Tab, close_old_tabs
-from .loader import Loader
-from .settings import SettingsManager
-from .updater import Updater
-from .utilities import Utilities
-from .customtypes import UserType
-
-
-basicConfig(
- level=get_log_level(),
- format="[%(module)s][%(levelname)s]: %(message)s"
-)
-
-logger = getLogger("Main")
-plugin_path = path.join(get_privileged_path(), "plugins")
-
-def chown_plugin_dir():
- if not path.exists(plugin_path): # For safety, create the folder before attempting to do anything with it
- mkdir_as_user(plugin_path)
-
- if not chown(plugin_path, UserType.HOST_USER) or not chmod(plugin_path, 555):
- logger.error(f"chown/chmod exited with a non-zero exit code")
-
-if get_chown_plugin_path() == True:
- chown_plugin_dir()
-
-class PluginManager:
- def __init__(self, loop: AbstractEventLoop) -> None:
- self.loop = loop
- self.web_app = Application()
- self.web_app.middlewares.append(csrf_middleware)
- self.cors = aiohttp_cors.setup(self.web_app, defaults={
- "https://steamloopback.host": aiohttp_cors.ResourceOptions(
- expose_headers="*",
- allow_headers="*",
- allow_credentials=True
- )
- })
- self.plugin_loader = Loader(self, plugin_path, self.loop, get_live_reload())
- self.settings = SettingsManager("loader", path.join(get_privileged_path(), "settings"))
- self.plugin_browser = PluginBrowser(plugin_path, self.plugin_loader.plugins, self.plugin_loader, self.settings)
- self.utilities = Utilities(self)
- self.updater = Updater(self)
-
- jinja_setup(self.web_app)
-
- async def startup(_: Application):
- if self.settings.getSetting("cef_forward", False):
- self.loop.create_task(service_start(REMOTE_DEBUGGER_UNIT))
- else:
- self.loop.create_task(service_stop(REMOTE_DEBUGGER_UNIT))
- self.loop.create_task(self.loader_reinjector())
- self.loop.create_task(self.load_plugins())
-
- self.web_app.on_startup.append(startup)
-
- self.loop.set_exception_handler(self.exception_handler)
- self.web_app.add_routes([get("/auth/token", self.get_auth_token)])
-
- for route in list(self.web_app.router.routes()):
- self.cors.add(route) # type: ignore
- self.web_app.add_routes([static("/static", path.join(path.dirname(__file__), '..', 'static'))])
-
- def exception_handler(self, loop: AbstractEventLoop, context: Dict[str, str]):
- if context["message"] == "Unclosed connection":
- return
- loop.default_exception_handler(context)
-
- async def get_auth_token(self, request: Request):
- return Response(text=get_csrf_token())
-
- async def load_plugins(self):
- # await self.wait_for_server()
- logger.debug("Loading plugins")
- self.plugin_loader.import_plugins()
- # await inject_to_tab("SP", "window.syncDeckyPlugins();")
- if self.settings.getSetting("pluginOrder", None) == None:
- self.settings.setSetting("pluginOrder", list(self.plugin_loader.plugins.keys()))
- logger.debug("Did not find pluginOrder setting, set it to default")
-
- async def loader_reinjector(self):
- while True:
- tab = None
- nf = False
- dc = False
- while not tab:
- try:
- tab = await get_gamepadui_tab()
- except (client_exceptions.ClientConnectorError, client_exceptions.ServerDisconnectedError):
- if not dc:
- logger.debug("Couldn't connect to debugger, waiting...")
- dc = True
- pass
- except ValueError:
- if not nf:
- logger.debug("Couldn't find GamepadUI tab, waiting...")
- nf = True
- pass
- if not tab:
- await sleep(5)
- await tab.open_websocket()
- await tab.enable()
- await self.inject_javascript(tab, True)
- try:
- async for msg in tab.listen_for_message():
- # this gets spammed a lot
- if msg.get("method", None) != "Page.navigatedWithinDocument":
- logger.debug("Page event: " + str(msg.get("method", None)))
- if msg.get("method", None) == "Page.domContentEventFired":
- if not await tab.has_global_var("deckyHasLoaded", False):
- await self.inject_javascript(tab)
- if msg.get("method", None) == "Inspector.detached":
- logger.info("CEF has requested that we detach.")
- await tab.close_websocket()
- break
- # If this is a forceful disconnect the loop will just stop without any failure message. In this case, injector.py will handle this for us so we don't need to close the socket.
- # This is because of https://github.com/aio-libs/aiohttp/blob/3ee7091b40a1bc58a8d7846e7878a77640e96996/aiohttp/client_ws.py#L321
- logger.info("CEF has disconnected...")
- # At this point the loop starts again and we connect to the freshly started Steam client once it is ready.
- except Exception:
- logger.error("Exception while reading page events " + format_exc())
- await tab.close_websocket()
- pass
- # while True:
- # await sleep(5)
- # if not await tab.has_global_var("deckyHasLoaded", False):
- # logger.info("Plugin loader isn't present in Steam anymore, reinjecting...")
- # await self.inject_javascript(tab)
-
- async def inject_javascript(self, tab: Tab, first: bool=False, request: Request|None=None):
- logger.info("Loading Decky frontend!")
- try:
- if first:
- if await tab.has_global_var("deckyHasLoaded", False):
- await close_old_tabs()
- await tab.evaluate_js("try{if (window.deckyHasLoaded){setTimeout(() => location.reload(), 100)}else{window.deckyHasLoaded = true;(async()=>{try{while(!window.SP_REACT){await new Promise(r => setTimeout(r, 10))};await import('http://localhost:1337/frontend/index.js')}catch(e){console.error(e)};})();}}catch(e){console.error(e)}", False, False, False)
- except:
- logger.info("Failed to inject JavaScript into tab\n" + format_exc())
- pass
-
- def run(self):
- return run_app(self.web_app, host=get_server_host(), port=get_server_port(), loop=self.loop, access_log=None)
-
-def main():
- if ON_WINDOWS:
- # Fix windows/flask not recognising that .js means 'application/javascript'
- import mimetypes
- mimetypes.add_type('application/javascript', '.js')
-
- # Required for multiprocessing support in frozen files
- multiprocessing.freeze_support()
- else:
- if get_effective_user_id() != 0:
- logger.warning(f"decky is running as an unprivileged user, this is not officially supported and may cause issues")
-
- # Append the loader's plugin path to the recognized python paths
- sys.path.append(path.join(path.dirname(__file__), "..", "plugin"))
-
- # Append the system and user python paths
- sys.path.extend(get_system_pythonpaths())
-
- loop = new_event_loop()
- set_event_loop(loop)
- PluginManager(loop).run()
diff --git a/backend/src/plugin/method_call_request.py b/backend/src/plugin/method_call_request.py
deleted file mode 100644
index cebe34f8..00000000
--- a/backend/src/plugin/method_call_request.py
+++ /dev/null
@@ -1,29 +0,0 @@
-from typing import Any, TypedDict
-from uuid import uuid4
-from asyncio import Event
-
-class SocketResponseDict(TypedDict):
- id: str
- success: bool
- res: Any
-
-class MethodCallResponse:
- def __init__(self, success: bool, result: Any) -> None:
- self.success = success
- self.result = result
-
-class MethodCallRequest:
- def __init__(self) -> None:
- self.id = str(uuid4())
- self.event = Event()
- self.response: MethodCallResponse
-
- def set_result(self, dc: SocketResponseDict):
- self.response = MethodCallResponse(dc["success"], dc["res"])
- self.event.set()
-
- async def wait_for_result(self):
- await self.event.wait()
- if not self.response.success:
- raise Exception(self.response.result)
- return self.response.result \ No newline at end of file
diff --git a/backend/src/plugin/plugin.py b/backend/src/plugin/plugin.py
deleted file mode 100644
index 6c338106..00000000
--- a/backend/src/plugin/plugin.py
+++ /dev/null
@@ -1,84 +0,0 @@
-from asyncio import Task, create_task
-from json import dumps, load, loads
-from logging import getLogger
-from os import path
-from multiprocessing import Process
-
-from .sandboxed_plugin import SandboxedPlugin
-from .method_call_request import MethodCallRequest
-from ..localplatform.localsocket import LocalSocket
-
-from typing import Any, Callable, Coroutine, Dict
-
-class PluginWrapper:
- def __init__(self, file: str, plugin_directory: str, plugin_path: str) -> None:
- self.file = file
- self.plugin_path = plugin_path
- self.plugin_directory = plugin_directory
-
- self.version = None
-
- json = load(open(path.join(plugin_path, plugin_directory, "plugin.json"), "r", encoding="utf-8"))
- if path.isfile(path.join(plugin_path, plugin_directory, "package.json")):
- package_json = load(open(path.join(plugin_path, plugin_directory, "package.json"), "r", encoding="utf-8"))
- self.version = package_json["version"]
-
- self.name = json["name"]
- self.author = json["author"]
- self.flags = json["flags"]
-
- self.passive = not path.isfile(self.file)
-
- self.log = getLogger("plugin")
-
- self.sandboxed_plugin = SandboxedPlugin(self.name, self.passive, self.flags, self.file, self.plugin_directory, self.plugin_path, self.version, self.author)
- #TODO: Maybe make LocalSocket not require on_new_message to make this cleaner
- self._socket = LocalSocket(self.sandboxed_plugin.on_new_message)
- self._listener_task: Task[Any]
- self._method_call_requests: Dict[str, MethodCallRequest] = {}
-
- self.emitted_message_callback: Callable[[Dict[Any, Any]], Coroutine[Any, Any, Any]]
-
- def __str__(self) -> str:
- return self.name
-
- async def _response_listener(self):
- while True:
- try:
- line = await self._socket.read_single_line()
- if line != None:
- res = loads(line)
- if res["id"] == "0":
- create_task(self.emitted_message_callback(res["payload"]))
- else:
- self._method_call_requests.pop(res["id"]).set_result(res)
- except:
- pass
-
- def set_emitted_message_callback(self, callback: Callable[[Dict[Any, Any]], Coroutine[Any, Any, Any]]):
- self.emitted_message_callback = callback
-
- async def execute_method(self, method_name: str, kwargs: Dict[Any, Any]):
- if self.passive:
- raise RuntimeError("This plugin is passive (aka does not implement main.py)")
-
- request = MethodCallRequest()
- await self._socket.get_socket_connection()
- await self._socket.write_single_line(dumps({ "method": method_name, "args": kwargs, "id": request.id }, ensure_ascii=False))
- self._method_call_requests[request.id] = request
-
- return await request.wait_for_result()
-
- def start(self):
- if self.passive:
- return self
- Process(target=self.sandboxed_plugin.initialize, args=[self._socket]).start()
- self._listener_task = create_task(self._response_listener())
- return self
-
- def stop(self):
- self._listener_task.cancel()
- async def _(self: PluginWrapper):
- await self._socket.write_single_line(dumps({ "stop": True }, ensure_ascii=False))
- await self._socket.close_socket_connection()
- create_task(_(self)) \ No newline at end of file
diff --git a/backend/src/plugin/sandboxed_plugin.py b/backend/src/plugin/sandboxed_plugin.py
deleted file mode 100644
index adf9f802..00000000
--- a/backend/src/plugin/sandboxed_plugin.py
+++ /dev/null
@@ -1,138 +0,0 @@
-from os import path, environ
-from signal import SIGINT, signal
-from importlib.util import module_from_spec, spec_from_file_location
-from json import dumps, loads
-from logging import getLogger
-from sys import exit, path as syspath, modules as sysmodules
-from traceback import format_exc
-from asyncio import (get_event_loop, new_event_loop,
- set_event_loop, sleep)
-
-from .method_call_request import SocketResponseDict
-from ..localplatform.localsocket import LocalSocket
-from ..localplatform.localplatform import setgid, setuid, get_username, get_home_path
-from ..customtypes import UserType
-from .. import helpers
-
-from typing import Any, Dict, List
-
-class SandboxedPlugin:
- def __init__(self,
- name: str,
- passive: bool,
- flags: List[str],
- file: str,
- plugin_directory: str,
- plugin_path: str,
- version: str|None,
- author: str) -> None:
- self.name = name
- self.passive = passive
- self.flags = flags
- self.file = file
- self.plugin_path = plugin_path
- self.plugin_directory = plugin_directory
- self.version = version
- self.author = author
-
- self.log = getLogger("plugin")
-
- def initialize(self, socket: LocalSocket):
- self._socket = socket
-
- try:
- signal(SIGINT, lambda s, f: exit(0))
-
- set_event_loop(new_event_loop())
- if self.passive:
- return
- setgid(UserType.ROOT if "root" in self.flags else UserType.HOST_USER)
- setuid(UserType.ROOT if "root" in self.flags else UserType.HOST_USER)
- # export a bunch of environment variables to help plugin developers
- environ["HOME"] = get_home_path(UserType.ROOT if "root" in self.flags else UserType.HOST_USER)
- environ["USER"] = "root" if "root" in self.flags else get_username()
- environ["DECKY_VERSION"] = helpers.get_loader_version()
- environ["DECKY_USER"] = get_username()
- environ["DECKY_USER_HOME"] = helpers.get_home_path()
- environ["DECKY_HOME"] = helpers.get_homebrew_path()
- environ["DECKY_PLUGIN_SETTINGS_DIR"] = path.join(environ["DECKY_HOME"], "settings", self.plugin_directory)
- helpers.mkdir_as_user(path.join(environ["DECKY_HOME"], "settings"))
- helpers.mkdir_as_user(environ["DECKY_PLUGIN_SETTINGS_DIR"])
- environ["DECKY_PLUGIN_RUNTIME_DIR"] = path.join(environ["DECKY_HOME"], "data", self.plugin_directory)
- helpers.mkdir_as_user(path.join(environ["DECKY_HOME"], "data"))
- helpers.mkdir_as_user(environ["DECKY_PLUGIN_RUNTIME_DIR"])
- environ["DECKY_PLUGIN_LOG_DIR"] = path.join(environ["DECKY_HOME"], "logs", self.plugin_directory)
- helpers.mkdir_as_user(path.join(environ["DECKY_HOME"], "logs"))
- helpers.mkdir_as_user(environ["DECKY_PLUGIN_LOG_DIR"])
- environ["DECKY_PLUGIN_DIR"] = path.join(self.plugin_path, self.plugin_directory)
- environ["DECKY_PLUGIN_NAME"] = self.name
- if self.version:
- environ["DECKY_PLUGIN_VERSION"] = self.version
- environ["DECKY_PLUGIN_AUTHOR"] = self.author
-
- # append the plugin's `py_modules` to the recognized python paths
- syspath.append(path.join(environ["DECKY_PLUGIN_DIR"], "py_modules"))
-
- #TODO: FIX IN A LESS CURSED WAY
- keys = [key.replace("src.", "") for key in sysmodules if key.startswith("src.")]
- for key in keys:
- sysmodules[key] = sysmodules["src"].__dict__[key]
-
- spec = spec_from_file_location("_", self.file)
- assert spec is not None
- module = module_from_spec(spec)
- assert spec.loader is not None
- spec.loader.exec_module(module)
- self.Plugin = module.Plugin
-
- setattr(self.Plugin, "emit_message", self.emit_message)
- #TODO: Find how to put emit_message on global namespace so it doesn't pollute Plugin
-
- if hasattr(self.Plugin, "_migration"):
- get_event_loop().run_until_complete(self.Plugin._migration(self.Plugin))
- if hasattr(self.Plugin, "_main"):
- get_event_loop().create_task(self.Plugin._main(self.Plugin))
- get_event_loop().create_task(socket.setup_server())
- get_event_loop().run_forever()
- except:
- self.log.error("Failed to start " + self.name + "!\n" + format_exc())
- exit(0)
-
- async def _unload(self):
- try:
- self.log.info("Attempting to unload with plugin " + self.name + "'s \"_unload\" function.\n")
- if hasattr(self.Plugin, "_unload"):
- await self.Plugin._unload(self.Plugin)
- self.log.info("Unloaded " + self.name + "\n")
- else:
- self.log.info("Could not find \"_unload\" in " + self.name + "'s main.py" + "\n")
- except:
- self.log.error("Failed to unload " + self.name + "!\n" + format_exc())
- exit(0)
-
- async def on_new_message(self, message : str) -> str|None:
- data = loads(message)
-
- if "stop" in data:
- self.log.info("Calling Loader unload function.")
- await self._unload()
- get_event_loop().stop()
- while get_event_loop().is_running():
- await sleep(0)
- get_event_loop().close()
- raise Exception("Closing message listener")
-
- d: SocketResponseDict = {"res": None, "success": True, "id": data["id"]}
- try:
- d["res"] = await getattr(self.Plugin, data["method"])(self.Plugin, **data["args"])
- except Exception as e:
- d["res"] = str(e)
- d["success"] = False
- finally:
- return dumps(d, ensure_ascii=False)
-
- async def emit_message(self, message: Dict[Any, Any]):
- await self._socket.write_single_line_server(dumps({
- "id": "0",
- "payload": message
- })) \ No newline at end of file
diff --git a/backend/src/settings.py b/backend/src/settings.py
deleted file mode 100644
index c0f2b90c..00000000
--- a/backend/src/settings.py
+++ /dev/null
@@ -1,60 +0,0 @@
-from json import dump, load
-from os import mkdir, path, listdir, rename
-from typing import Any, Dict
-from .localplatform.localplatform import chown, folder_owner, get_chown_plugin_path
-from .customtypes import UserType
-
-from .helpers import get_homebrew_path
-
-
-class SettingsManager:
- def __init__(self, name: str, settings_directory: str | None = None) -> None:
- wrong_dir = get_homebrew_path()
- if settings_directory == None:
- settings_directory = path.join(wrong_dir, "settings")
-
- self.path = path.join(settings_directory, name + ".json")
-
- #Create the folder with the correct permission
- if not path.exists(settings_directory):
- mkdir(settings_directory)
-
- #Copy all old settings file in the root directory to the correct folder
- for file in listdir(wrong_dir):
- if file.endswith(".json"):
- rename(path.join(wrong_dir,file),
- path.join(settings_directory, file))
- self.path = path.join(settings_directory, name + ".json")
-
-
- #If the owner of the settings directory is not the user, then set it as the user:
- expected_user = UserType.HOST_USER if get_chown_plugin_path() else UserType.ROOT
- if folder_owner(settings_directory) != expected_user:
- chown(settings_directory, expected_user, False)
-
- self.settings: Dict[str, Any] = {}
-
- try:
- open(self.path, "x", encoding="utf-8")
- except FileExistsError as _:
- self.read()
- pass
-
- def read(self):
- try:
- with open(self.path, "r", encoding="utf-8") as file:
- self.settings = load(file)
- except Exception as e:
- print(e)
- pass
-
- def commit(self):
- with open(self.path, "w+", encoding="utf-8") as file:
- dump(self.settings, file, indent=4, ensure_ascii=False)
-
- def getSetting(self, key: str, default: Any = None) -> Any:
- return self.settings.get(key, default)
-
- def setSetting(self, key: str, value: Any) -> Any:
- self.settings[key] = value
- self.commit()
diff --git a/backend/src/updater.py b/backend/src/updater.py
deleted file mode 100644
index f8aef429..00000000
--- a/backend/src/updater.py
+++ /dev/null
@@ -1,238 +0,0 @@
-from __future__ import annotations
-import os
-import shutil
-from asyncio import sleep
-from json.decoder import JSONDecodeError
-from logging import getLogger
-from os import getcwd, path, remove
-from typing import TYPE_CHECKING, List, TypedDict
-if TYPE_CHECKING:
- from .main import PluginManager
-from .localplatform.localplatform import chmod, service_restart, ON_LINUX, get_keep_systemd_service, get_selinux
-
-from aiohttp import ClientSession, web
-
-from . import helpers
-from .injector import get_gamepadui_tab
-from .settings import SettingsManager
-
-logger = getLogger("Updater")
-
-class RemoteVerAsset(TypedDict):
- name: str
- browser_download_url: str
-class RemoteVer(TypedDict):
- tag_name: str
- prerelease: bool
- assets: List[RemoteVerAsset]
-
-class Updater:
- def __init__(self, context: PluginManager) -> None:
- self.context = context
- self.settings = self.context.settings
- # Exposes updater methods to frontend
- self.updater_methods = {
- "get_branch": self._get_branch,
- "get_version": self.get_version,
- "do_update": self.do_update,
- "do_restart": self.do_restart,
- "check_for_updates": self.check_for_updates
- }
- self.remoteVer: RemoteVer | None = None
- self.allRemoteVers: List[RemoteVer] = []
- self.localVer = helpers.get_loader_version()
-
- try:
- self.currentBranch = self.get_branch(self.context.settings)
- except:
- self.currentBranch = 0
- logger.error("Current branch could not be determined, defaulting to \"Stable\"")
-
- if context:
- context.web_app.add_routes([
- web.post("/updater/{method_name}", self._handle_server_method_call)
- ])
- context.loop.create_task(self.version_reloader())
-
- async def _handle_server_method_call(self, request: web.Request):
- method_name = request.match_info["method_name"]
- try:
- args = await request.json()
- except JSONDecodeError:
- args = {}
- res = {}
- try:
- r = await self.updater_methods[method_name](**args) # type: ignore
- res["result"] = r
- res["success"] = True
- except Exception as e:
- res["result"] = str(e)
- res["success"] = False
- return web.json_response(res)
-
- def get_branch(self, manager: SettingsManager):
- ver = manager.getSetting("branch", -1)
- logger.debug("current branch: %i" % ver)
- if ver == -1:
- logger.info("Current branch is not set, determining branch from version...")
- if self.localVer.startswith("v") and "-pre" in self.localVer:
- logger.info("Current version determined to be pre-release")
- manager.setSetting('branch', 1)
- return 1
- else:
- logger.info("Current version determined to be stable")
- manager.setSetting('branch', 0)
- return 0
- return ver
-
- async def _get_branch(self, manager: SettingsManager):
- return self.get_branch(manager)
-
- # retrieve relevant service file's url for each branch
- def get_service_url(self):
- logger.debug("Getting service URL")
- branch = self.get_branch(self.context.settings)
- match branch:
- case 0:
- url = "https://raw.githubusercontent.com/SteamDeckHomebrew/decky-loader/main/dist/plugin_loader-release.service"
- case 1 | 2:
- url = "https://raw.githubusercontent.com/SteamDeckHomebrew/decky-loader/main/dist/plugin_loader-prerelease.service"
- case _:
- logger.error("You have an invalid branch set... Defaulting to prerelease service, please send the logs to the devs!")
- url = "https://raw.githubusercontent.com/SteamDeckHomebrew/decky-loader/main/dist/plugin_loader-prerelease.service"
- return str(url)
-
- async def get_version(self):
- return {
- "current": self.localVer,
- "remote": self.remoteVer,
- "all": self.allRemoteVers,
- "updatable": self.localVer != "unknown"
- }
-
- async def check_for_updates(self):
- logger.debug("checking for updates")
- selectedBranch = self.get_branch(self.context.settings)
- async with ClientSession() as web:
- async with web.request("GET", "https://api.github.com/repos/SteamDeckHomebrew/decky-loader/releases", ssl=helpers.get_ssl_context()) as res:
- remoteVersions: List[RemoteVer] = await res.json()
- if selectedBranch == 0:
- logger.debug("release type: release")
- remoteVersions = list(filter(lambda ver: ver["tag_name"].startswith("v") and not ver["prerelease"] and not ver["tag_name"].find("-pre") > 0 and ver["tag_name"], remoteVersions))
- elif selectedBranch == 1:
- logger.debug("release type: pre-release")
- remoteVersions = list(filter(lambda ver:ver["tag_name"].startswith("v"), remoteVersions))
- else:
- logger.error("release type: NOT FOUND")
- raise ValueError("no valid branch found")
- self.allRemoteVers = remoteVersions
- logger.debug("determining release type to find, branch is %i" % selectedBranch)
- if selectedBranch == 0:
- logger.debug("release type: release")
- self.remoteVer = next(filter(lambda ver: ver["tag_name"].startswith("v") and not ver["prerelease"] and not ver["tag_name"].find("-pre") > 0 and ver["tag_name"], remoteVersions), None)
- elif selectedBranch == 1:
- logger.debug("release type: pre-release")
- self.remoteVer = next(filter(lambda ver:ver["tag_name"].startswith("v"), remoteVersions), None)
- else:
- logger.error("release type: NOT FOUND")
- raise ValueError("no valid branch found")
- logger.info("Updated remote version information")
- tab = await get_gamepadui_tab()
- await tab.evaluate_js(f"window.DeckyPluginLoader.notifyUpdates()", False, True, False)
- return await self.get_version()
-
- async def version_reloader(self):
- await sleep(30)
- while True:
- try:
- await self.check_for_updates()
- except:
- pass
- await sleep(60 * 60 * 6) # 6 hours
-
- async def do_update(self):
- logger.debug("Starting update.")
- try:
- assert self.remoteVer
- except AssertionError:
- logger.error("Unable to update as remoteVer is missing")
- return
-
- version = self.remoteVer["tag_name"]
- download_url = None
- download_filename = "PluginLoader" if ON_LINUX else "PluginLoader.exe"
- download_temp_filename = download_filename + ".new"
-
- for x in self.remoteVer["assets"]:
- if x["name"] == download_filename:
- download_url = x["browser_download_url"]
- break
-
- if download_url == None:
- raise Exception("Download url not found")
-
- service_url = self.get_service_url()
- logger.debug("Retrieved service URL")
-
- tab = await get_gamepadui_tab()
- await tab.open_websocket()
- async with ClientSession() as web:
- if ON_LINUX and not get_keep_systemd_service():
- logger.debug("Downloading systemd service")
- # download the relevant systemd service depending upon branch
- async with web.request("GET", service_url, ssl=helpers.get_ssl_context(), allow_redirects=True) as res:
- logger.debug("Downloading service file")
- data = await res.content.read()
- logger.debug(str(data))
- service_file_path = path.join(getcwd(), "plugin_loader.service")
- try:
- with open(path.join(getcwd(), "plugin_loader.service"), "wb") as out:
- out.write(data)
- except Exception as e:
- logger.error(f"Error at %s", exc_info=e)
- with open(path.join(getcwd(), "plugin_loader.service"), "r", encoding="utf-8") as service_file:
- service_data = service_file.read()
- service_data = service_data.replace("${HOMEBREW_FOLDER}", helpers.get_homebrew_path())
- with open(path.join(getcwd(), "plugin_loader.service"), "w", encoding="utf-8") as service_file:
- service_file.write(service_data)
-
- logger.debug("Saved service file")
- logger.debug("Copying service file over current file.")
- shutil.copy(service_file_path, "/etc/systemd/system/plugin_loader.service")
- if not os.path.exists(path.join(getcwd(), ".systemd")):
- os.mkdir(path.join(getcwd(), ".systemd"))
- shutil.move(service_file_path, path.join(getcwd(), ".systemd")+"/plugin_loader.service")
-
- logger.debug("Downloading binary")
- async with web.request("GET", download_url, ssl=helpers.get_ssl_context(), allow_redirects=True) as res:
- total = int(res.headers.get('content-length', 0))
- with open(path.join(getcwd(), download_temp_filename), "wb") as out:
- progress = 0
- raw = 0
- async for c in res.content.iter_chunked(512):
- out.write(c)
- raw += len(c)
- new_progress = round((raw / total) * 100)
- if progress != new_progress:
- self.context.loop.create_task(tab.evaluate_js(f"window.DeckyUpdater.updateProgress({new_progress})", False, False, False))
- progress = new_progress
-
- with open(path.join(getcwd(), ".loader.version"), "w", encoding="utf-8") as out:
- out.write(version)
-
- if ON_LINUX:
- remove(path.join(getcwd(), download_filename))
- shutil.move(path.join(getcwd(), download_temp_filename), path.join(getcwd(), download_filename))
- chmod(path.join(getcwd(), download_filename), 777, False)
- if get_selinux():
- from asyncio.subprocess import create_subprocess_exec
- process = await create_subprocess_exec("chcon", "-t", "bin_t", path.join(getcwd(), download_filename))
- logger.info(f"Setting the executable flag with chcon returned {await process.wait()}")
-
- logger.info("Updated loader installation.")
- await tab.evaluate_js("window.DeckyUpdater.finish()", False, False)
- await self.do_restart()
- await tab.close_websocket()
-
- async def do_restart(self):
- await service_restart("plugin_loader")
diff --git a/backend/src/utilities.py b/backend/src/utilities.py
deleted file mode 100644
index f04ed371..00000000
--- a/backend/src/utilities.py
+++ /dev/null
@@ -1,373 +0,0 @@
-from __future__ import annotations
-from os import stat_result
-import uuid
-from json.decoder import JSONDecodeError
-from os.path import splitext
-import re
-from traceback import format_exc
-from stat import FILE_ATTRIBUTE_HIDDEN # type: ignore
-
-from asyncio import StreamReader, StreamWriter, start_server, gather, open_connection
-from aiohttp import ClientSession, web
-from typing import TYPE_CHECKING, Callable, Coroutine, Dict, Any, List, TypedDict
-
-from logging import getLogger
-from pathlib import Path
-
-from .browser import PluginInstallRequest, PluginInstallType
-if TYPE_CHECKING:
- from .main import PluginManager
-from .injector import inject_to_tab, get_gamepadui_tab, close_old_tabs, get_tab
-from .localplatform.localplatform import ON_WINDOWS
-from . import helpers
-from .localplatform.localplatform import service_stop, service_start, get_home_path, get_username
-
-class FilePickerObj(TypedDict):
- file: Path
- filest: stat_result
- is_dir: bool
-
-class Utilities:
- def __init__(self, context: PluginManager) -> None:
- self.context = context
- self.util_methods: Dict[str, Callable[..., Coroutine[Any, Any, Any]]] = {
- "ping": self.ping,
- "http_request": self.http_request,
- "install_plugin": self.install_plugin,
- "install_plugins": self.install_plugins,
- "cancel_plugin_install": self.cancel_plugin_install,
- "confirm_plugin_install": self.confirm_plugin_install,
- "uninstall_plugin": self.uninstall_plugin,
- "execute_in_tab": self.execute_in_tab,
- "inject_css_into_tab": self.inject_css_into_tab,
- "remove_css_from_tab": self.remove_css_from_tab,
- "allow_remote_debugging": self.allow_remote_debugging,
- "disallow_remote_debugging": self.disallow_remote_debugging,
- "set_setting": self.set_setting,
- "get_setting": self.get_setting,
- "filepicker_ls": self.filepicker_ls,
- "disable_rdt": self.disable_rdt,
- "enable_rdt": self.enable_rdt,
- "get_tab_id": self.get_tab_id,
- "get_user_info": self.get_user_info,
- }
-
- self.logger = getLogger("Utilities")
-
- self.rdt_proxy_server = None
- self.rdt_script_id = None
- self.rdt_proxy_task = None
-
- if context:
- context.web_app.add_routes([
- web.post("/methods/{method_name}", self._handle_server_method_call)
- ])
-
- async def _handle_server_method_call(self, request: web.Request):
- method_name = request.match_info["method_name"]
- try:
- args = await request.json()
- except JSONDecodeError:
- args = {}
- res = {}
- try:
- r = await self.util_methods[method_name](**args)
- res["result"] = r
- res["success"] = True
- except Exception as e:
- res["result"] = str(e)
- res["success"] = False
- return web.json_response(res)
-
- async def install_plugin(self, artifact: str="", name: str="No name", version: str="dev", hash: str="", install_type: PluginInstallType=PluginInstallType.INSTALL):
- return await self.context.plugin_browser.request_plugin_install(
- artifact=artifact,
- name=name,
- version=version,
- hash=hash,
- install_type=install_type
- )
-
- async def install_plugins(self, requests: List[PluginInstallRequest]):
- return await self.context.plugin_browser.request_multiple_plugin_installs(
- requests=requests
- )
-
- async def confirm_plugin_install(self, request_id: str):
- return await self.context.plugin_browser.confirm_plugin_install(request_id)
-
- async def cancel_plugin_install(self, request_id: str):
- return self.context.plugin_browser.cancel_plugin_install(request_id)
-
- async def uninstall_plugin(self, name: str):
- return await self.context.plugin_browser.uninstall_plugin(name)
-
- async def http_request(self, method: str="", url: str="", **kwargs: Any):
- async with ClientSession() as web:
- res = await web.request(method, url, ssl=helpers.get_ssl_context(), **kwargs)
- text = await res.text()
- return {
- "status": res.status,
- "headers": dict(res.headers),
- "body": text
- }
-
- async def ping(self, **kwargs: Any):
- return "pong"
-
- async def execute_in_tab(self, tab: str, run_async: bool, code: str):
- try:
- result = await inject_to_tab(tab, code, run_async)
- assert result
- if "exceptionDetails" in result["result"]:
- return {
- "success": False,
- "result": result["result"]
- }
-
- return {
- "success": True,
- "result": result["result"]["result"].get("value")
- }
- except Exception as e:
- return {
- "success": False,
- "result": e
- }
-
- async def inject_css_into_tab(self, tab: str, style: str):
- try:
- css_id = str(uuid.uuid4())
-
- result = await inject_to_tab(tab,
- f"""
- (function() {{
- const style = document.createElement('style');
- style.id = "{css_id}";
- document.head.append(style);
- style.textContent = `{style}`;
- }})()
- """, False)
-
- if result and "exceptionDetails" in result["result"]:
- return {
- "success": False,
- "result": result["result"]
- }
-
- return {
- "success": True,
- "result": css_id
- }
- except Exception as e:
- return {
- "success": False,
- "result": e
- }
-
- async def remove_css_from_tab(self, tab: str, css_id: str):
- try:
- result = await inject_to_tab(tab,
- f"""
- (function() {{
- let style = document.getElementById("{css_id}");
-
- if (style.nodeName.toLowerCase() == 'style')
- style.parentNode.removeChild(style);
- }})()
- """, False)
-
- if result and "exceptionDetails" in result["result"]:
- return {
- "success": False,
- "result": result
- }
-
- return {
- "success": True
- }
- except Exception as e:
- return {
- "success": False,
- "result": e
- }
-
- async def get_setting(self, key: str, default: Any):
- return self.context.settings.getSetting(key, default)
-
- async def set_setting(self, key: str, value: Any):
- return self.context.settings.setSetting(key, value)
-
- async def allow_remote_debugging(self):
- await service_start(helpers.REMOTE_DEBUGGER_UNIT)
- return True
-
- async def disallow_remote_debugging(self):
- await service_stop(helpers.REMOTE_DEBUGGER_UNIT)
- return True
-
- async def filepicker_ls(self,
- path : str | None = None,
- include_files: bool = True,
- include_folders: bool = True,
- include_ext: list[str] = [],
- include_hidden: bool = False,
- order_by: str = "name_asc",
- filter_for: str | None = None,
- page: int = 1,
- max: int = 1000):
-
- if path == None:
- path = get_home_path()
-
- path_obj = Path(path).resolve()
-
- files: List[FilePickerObj] = []
- folders: List[FilePickerObj] = []
-
- #Resolving all files/folders in the requested directory
- for file in path_obj.iterdir():
- if file.exists():
- filest = file.stat()
- is_hidden = file.name.startswith('.')
- if ON_WINDOWS and not is_hidden:
- is_hidden = bool(filest.st_file_attributes & FILE_ATTRIBUTE_HIDDEN) # type: ignore
- if include_folders and file.is_dir():
- if (is_hidden and include_hidden) or not is_hidden:
- folders.append({"file": file, "filest": filest, "is_dir": True})
- elif include_files:
- # Handle requested extensions if present
- if len(include_ext) == 0 or 'all_files' in include_ext \
- or splitext(file.name)[1].lstrip('.') in include_ext:
- if (is_hidden and include_hidden) or not is_hidden:
- files.append({"file": file, "filest": filest, "is_dir": False})
- # Filter logic
- if filter_for is not None:
- try:
- if re.compile(filter_for):
- files = list(filter(lambda file: re.search(filter_for, file["file"].name) != None, files))
- except re.error:
- files = list(filter(lambda file: file["file"].name.find(filter_for) != -1, files))
-
- # Ordering logic
- ord_arg = order_by.split("_")
- ord = ord_arg[0]
- rev = True if ord_arg[1] == "asc" else False
- match ord:
- case 'name':
- files.sort(key=lambda x: x['file'].name.casefold(), reverse = rev)
- folders.sort(key=lambda x: x['file'].name.casefold(), reverse = rev)
- case 'modified':
- files.sort(key=lambda x: x['filest'].st_mtime, reverse = not rev)
- folders.sort(key=lambda x: x['filest'].st_mtime, reverse = not rev)
- case 'created':
- files.sort(key=lambda x: x['filest'].st_ctime, reverse = not rev)
- folders.sort(key=lambda x: x['filest'].st_ctime, reverse = not rev)
- case 'size':
- files.sort(key=lambda x: x['filest'].st_size, reverse = not rev)
- # Folders has no file size, order by name instead
- folders.sort(key=lambda x: x['file'].name.casefold())
- case _:
- files.sort(key=lambda x: x['file'].name.casefold(), reverse = rev)
- folders.sort(key=lambda x: x['file'].name.casefold(), reverse = rev)
-
- #Constructing the final file list, folders first
- all = [{
- "isdir": x['is_dir'],
- "name": str(x['file'].name),
- "realpath": str(x['file']),
- "size": x['filest'].st_size,
- "modified": x['filest'].st_mtime,
- "created": x['filest'].st_ctime,
- } for x in folders + files ]
-
- return {
- "realpath": str(path),
- "files": all[(page-1)*max:(page)*max],
- "total": len(all),
- }
-
-
- # Based on https://stackoverflow.com/a/46422554/13174603
- def start_rdt_proxy(self, ip: str, port: int):
- async def pipe(reader: StreamReader, writer: StreamWriter):
- try:
- while not reader.at_eof():
- writer.write(await reader.read(2048))
- finally:
- writer.close()
- async def handle_client(local_reader: StreamReader, local_writer: StreamWriter):
- try:
- remote_reader, remote_writer = await open_connection(
- ip, port)
- pipe1 = pipe(local_reader, remote_writer)
- pipe2 = pipe(remote_reader, local_writer)
- await gather(pipe1, pipe2)
- finally:
- local_writer.close()
-
- self.rdt_proxy_server = start_server(handle_client, "127.0.0.1", port)
- self.rdt_proxy_task = self.context.loop.create_task(self.rdt_proxy_server)
-
- def stop_rdt_proxy(self):
- if self.rdt_proxy_server != None:
- self.rdt_proxy_server.close()
- if self.rdt_proxy_task:
- self.rdt_proxy_task.cancel()
-
- async def _enable_rdt(self):
- # TODO un-hardcode port
- try:
- self.stop_rdt_proxy()
- ip = self.context.settings.getSetting("developer.rdt.ip", None)
-
- if ip != None:
- self.logger.info("Connecting to React DevTools at " + ip)
- async with ClientSession() as web:
- res = await web.request("GET", "http://" + ip + ":8097", ssl=helpers.get_ssl_context())
- script = """
- if (!window.deckyHasConnectedRDT) {
- window.deckyHasConnectedRDT = true;
- // This fixes the overlay when hovering over an element in RDT
- Object.defineProperty(window, '__REACT_DEVTOOLS_TARGET_WINDOW__', {
- enumerable: true,
- configurable: true,
- get: function() {
- return (GamepadNavTree?.m_context?.m_controller || FocusNavController)?.m_ActiveContext?.ActiveWindow || window;
- }
- });
- """ + await res.text() + "\n}"
- if res.status != 200:
- self.logger.error("Failed to connect to React DevTools at " + ip)
- return False
- self.start_rdt_proxy(ip, 8097)
- self.logger.info("Connected to React DevTools, loading script")
- tab = await get_gamepadui_tab()
- # RDT needs to load before React itself to work.
- await close_old_tabs()
- result = await tab.reload_and_evaluate(script)
- self.logger.info(result)
-
- except Exception:
- self.logger.error("Failed to connect to React DevTools")
- self.logger.error(format_exc())
-
- async def enable_rdt(self):
- self.context.loop.create_task(self._enable_rdt())
-
- async def disable_rdt(self):
- self.logger.info("Disabling React DevTools")
- tab = await get_gamepadui_tab()
- self.rdt_script_id = None
- await close_old_tabs()
- await tab.evaluate_js("location.reload();", False, True, False)
- self.logger.info("React DevTools disabled")
-
- async def get_user_info(self) -> Dict[str, str]:
- return {
- "username": get_username(),
- "path": get_home_path()
- }
-
- async def get_tab_id(self, name: str):
- return (await get_tab(name)).id