diff options
| author | AAGaming <aagaming@riseup.net> | 2023-08-26 22:06:01 -0400 |
|---|---|---|
| committer | marios8543 <marios8543@gmail.com> | 2023-10-17 17:08:23 +0300 |
| commit | e2d708a6af0ec75c557b11d3a442af57240302b4 (patch) | |
| tree | 4d784163cc7fe0e7eb12a0a5de75aae1e2a64501 /backend | |
| parent | 1e1e82ed71524ad5cb879e80fc4f7615d59fdba2 (diff) | |
| download | decky-loader-e2d708a6af0ec75c557b11d3a442af57240302b4.tar.gz decky-loader-e2d708a6af0ec75c557b11d3a442af57240302b4.zip | |
begin adding static types to backend code
Diffstat (limited to 'backend')
| -rw-r--r-- | backend/browser.py | 62 | ||||
| -rw-r--r-- | backend/helpers.py | 55 | ||||
| -rw-r--r-- | backend/injector.py | 84 | ||||
| -rw-r--r-- | backend/loader.py | 69 | ||||
| -rw-r--r-- | backend/localplatformlinux.py | 16 | ||||
| -rw-r--r-- | backend/main.py | 10 | ||||
| -rw-r--r-- | backend/plugin.py | 20 | ||||
| -rw-r--r-- | backend/settings.py | 11 | ||||
| -rw-r--r-- | backend/utilities.py | 15 |
9 files changed, 189 insertions, 153 deletions
diff --git a/backend/browser.py b/backend/browser.py index ce9b3dd7..358c05f9 100644 --- a/backend/browser.py +++ b/backend/browser.py @@ -4,53 +4,70 @@ import json # from pprint import pformat # Partial imports -from aiohttp import ClientSession, web -from asyncio import get_event_loop, sleep -from concurrent.futures import ProcessPoolExecutor +from aiohttp import ClientSession +from asyncio import sleep from hashlib import sha256 from io import BytesIO from logging import getLogger -from os import R_OK, W_OK, path, rename, listdir, access, mkdir +from os import R_OK, W_OK, path, listdir, access, mkdir from shutil import rmtree from time import time from zipfile import ZipFile from localplatform import chown, chmod +from enum import IntEnum +from typing import Dict, List, TypedDict # Local modules -from helpers import get_ssl_context, download_remote_binary_to_path -from injector import get_gamepadui_tab +from .loader import Loader, Plugins +from .helpers import get_ssl_context, download_remote_binary_to_path +from .settings import SettingsManager +from .injector import get_gamepadui_tab logger = getLogger("Browser") +class PluginInstallType(IntEnum): + INSTALL = 0 + REINSTALL = 1 + UPDATE = 2 + +class PluginInstallRequest(TypedDict): + name: str + artifact: str + version: str + hash: str + install_type: PluginInstallType + class PluginInstallContext: - def __init__(self, artifact, name, version, hash) -> None: + def __init__(self, artifact: str, name: str, version: str, hash: str) -> None: self.artifact = artifact self.name = name self.version = version self.hash = hash class PluginBrowser: - def __init__(self, plugin_path, plugins, loader, settings) -> None: + def __init__(self, plugin_path: str, plugins: Plugins, loader: Loader, settings: SettingsManager) -> None: self.plugin_path = plugin_path self.plugins = plugins self.loader = loader self.settings = settings - self.install_requests = {} + self.install_requests: Dict[str, PluginInstallContext | List[PluginInstallContext]] = {} - def _unzip_to_plugin_dir(self, zip, name, hash): + def _unzip_to_plugin_dir(self, zip: BytesIO, name: str, hash: str): zip_hash = sha256(zip.getbuffer()).hexdigest() if hash and (zip_hash != hash): return False zip_file = ZipFile(zip) zip_file.extractall(self.plugin_path) - plugin_dir = path.join(self.plugin_path, self.find_plugin_folder(name)) + plugin_folder = self.find_plugin_folder(name) + assert plugin_folder is not None + plugin_dir = path.join(self.plugin_path, plugin_folder) if not chown(plugin_dir) or not chmod(plugin_dir, 555): logger.error(f"chown/chmod exited with a non-zero exit code") return False return True - async def _download_remote_binaries_for_plugin_with_name(self, pluginBasePath): + async def _download_remote_binaries_for_plugin_with_name(self, pluginBasePath: str): rv = False try: packageJsonPath = path.join(pluginBasePath, 'package.json') @@ -91,7 +108,7 @@ class PluginBrowser: return rv """Return the filename (only) for the specified plugin""" - def find_plugin_folder(self, name): + def find_plugin_folder(self, name: str) -> str | None: for folder in listdir(self.plugin_path): try: with open(path.join(self.plugin_path, folder, 'plugin.json'), "r", encoding="utf-8") as f: @@ -102,11 +119,13 @@ class PluginBrowser: except: logger.debug(f"skipping {folder}") - async def uninstall_plugin(self, name): + async def uninstall_plugin(self, name: str): if self.loader.watcher: self.loader.watcher.disabled = True tab = await get_gamepadui_tab() - plugin_dir = path.join(self.plugin_path, self.find_plugin_folder(name)) + plugin_folder = self.find_plugin_folder(name) + assert plugin_folder is not None + plugin_dir = path.join(self.plugin_path, ) try: logger.info("uninstalling " + name) logger.info(" at dir " + plugin_dir) @@ -133,7 +152,7 @@ class PluginBrowser: if self.loader.watcher: self.loader.watcher.disabled = False - async def _install(self, artifact, name, version, hash): + async def _install(self, artifact: str, name: str, version: str, hash: str): # Will be set later in code res_zip = None @@ -185,6 +204,7 @@ class PluginBrowser: ret = self._unzip_to_plugin_dir(res_zip, name, hash) if ret: plugin_folder = self.find_plugin_folder(name) + assert plugin_folder is not None plugin_dir = path.join(self.plugin_path, plugin_folder) ret = await self._download_remote_binaries_for_plugin_with_name(plugin_dir) if ret: @@ -206,14 +226,14 @@ class PluginBrowser: if self.loader.watcher: self.loader.watcher.disabled = False - async def request_plugin_install(self, artifact, name, version, hash, install_type): + async def request_plugin_install(self, artifact: str, name: str, version: str, hash: str, install_type: PluginInstallType): request_id = str(time()) self.install_requests[request_id] = PluginInstallContext(artifact, name, version, hash) tab = await get_gamepadui_tab() await tab.open_websocket() await tab.evaluate_js(f"DeckyPluginLoader.addPluginInstallPrompt('{name}', '{version}', '{request_id}', '{hash}', {install_type})") - async def request_multiple_plugin_installs(self, requests): + async def request_multiple_plugin_installs(self, requests: List[PluginInstallRequest]): request_id = str(time()) self.install_requests[request_id] = [PluginInstallContext(req['artifact'], req['name'], req['version'], req['hash']) for req in requests] js_requests_parameter = ','.join([ @@ -224,17 +244,17 @@ class PluginBrowser: await tab.open_websocket() await tab.evaluate_js(f"DeckyPluginLoader.addMultiplePluginsInstallPrompt('{request_id}', [{js_requests_parameter}])") - async def confirm_plugin_install(self, request_id): + async def confirm_plugin_install(self, request_id: str): requestOrRequests = self.install_requests.pop(request_id) if isinstance(requestOrRequests, list): [await self._install(req.artifact, req.name, req.version, req.hash) for req in requestOrRequests] else: await self._install(requestOrRequests.artifact, requestOrRequests.name, requestOrRequests.version, requestOrRequests.hash) - def cancel_plugin_install(self, request_id): + def cancel_plugin_install(self, request_id: str): self.install_requests.pop(request_id) - def cleanup_plugin_settings(self, name): + def cleanup_plugin_settings(self, name: str): """Removes any settings related to a plugin. Propably called when a plugin is uninstalled. Args: diff --git a/backend/helpers.py b/backend/helpers.py index a1877fb8..4036db85 100644 --- a/backend/helpers.py +++ b/backend/helpers.py @@ -2,13 +2,13 @@ import re import ssl import uuid import os -import sys import subprocess from hashlib import sha256 from io import BytesIO import certifi -from aiohttp.web import Response, middleware +from aiohttp.web import Request, Response, middleware +from aiohttp.typedefs import Handler from aiohttp import ClientSession import localplatform from customtypes import UserType @@ -31,17 +31,17 @@ def get_csrf_token(): return csrf_token @middleware -async def csrf_middleware(request, handler): +async def csrf_middleware(request: Request, handler: Handler): if str(request.method) == "OPTIONS" or request.headers.get('Authentication') == csrf_token or str(request.rel_url) == "/auth/token" or str(request.rel_url).startswith("/plugins/load_main/") or str(request.rel_url).startswith("/static/") or str(request.rel_url).startswith("/legacy/") or str(request.rel_url).startswith("/steam_resource/") or str(request.rel_url).startswith("/frontend/") or assets_regex.match(str(request.rel_url)) or frontend_regex.match(str(request.rel_url)): return await handler(request) - return Response(text='Forbidden', status='403') + return Response(text='Forbidden', status=403) # Get the default homebrew path unless a home_path is specified. home_path argument is deprecated -def get_homebrew_path(home_path = None) -> str: +def get_homebrew_path() -> str: return localplatform.get_unprivileged_path() # Recursively create path and chown as user -def mkdir_as_user(path): +def mkdir_as_user(path: str): path = os.path.realpath(path) os.makedirs(path, exist_ok=True) localplatform.chown(path) @@ -57,23 +57,18 @@ def get_loader_version() -> str: # returns the appropriate system python paths def get_system_pythonpaths() -> list[str]: - extra_args = {} - - if localplatform.ON_LINUX: - # run as normal normal user to also include user python paths - extra_args["user"] = localplatform.localplatform._get_user_id() - extra_args["env"] = {} - try: + # run as normal normal user if on linux to also include user python paths proc = subprocess.run(["python3" if localplatform.ON_LINUX else "python", "-c", "import sys; print('\\n'.join(x for x in sys.path if x))"], - capture_output=True, **extra_args) + # TODO make this less insane + capture_output=True, user=localplatform.localplatform._get_user_id() if localplatform.ON_LINUX else None, env={} if localplatform.ON_LINUX else None) # type: ignore return [x.strip() for x in proc.stdout.decode().strip().split("\n")] except Exception as e: logger.warn(f"Failed to execute get_system_pythonpaths(): {str(e)}") return [] # Download Remote Binaries to local Plugin -async def download_remote_binary_to_path(url, binHash, path) -> bool: +async def download_remote_binary_to_path(url: str, binHash: str, path: str) -> bool: rv = False try: if os.access(os.path.dirname(path), os.W_OK): @@ -110,46 +105,42 @@ def set_user_group() -> str: # Get the user id hosting the plugin loader def get_user_id() -> int: - return localplatform.localplatform._get_user_id() + return localplatform.localplatform._get_user_id() # pyright: ignore [reportPrivateUsage] # Get the user hosting the plugin loader def get_user() -> str: - return localplatform.localplatform._get_user() + return localplatform.localplatform._get_user() # pyright: ignore [reportPrivateUsage] # Get the effective user id of the running process def get_effective_user_id() -> int: - return localplatform.localplatform._get_effective_user_id() + return localplatform.localplatform._get_effective_user_id() # pyright: ignore [reportPrivateUsage] # Get the effective user of the running process def get_effective_user() -> str: - return localplatform.localplatform._get_effective_user() + return localplatform.localplatform._get_effective_user() # pyright: ignore [reportPrivateUsage] # Get the effective user group id of the running process def get_effective_user_group_id() -> int: - return localplatform.localplatform._get_effective_user_group_id() + return localplatform.localplatform._get_effective_user_group_id() # pyright: ignore [reportPrivateUsage] # Get the effective user group of the running process def get_effective_user_group() -> str: - return localplatform.localplatform._get_effective_user_group() + return localplatform.localplatform._get_effective_user_group() # pyright: ignore [reportPrivateUsage] # Get the user owner of the given file path. -def get_user_owner(file_path) -> str: - return localplatform.localplatform._get_user_owner(file_path) +def get_user_owner(file_path: str) -> str: + return localplatform.localplatform._get_user_owner(file_path) # pyright: ignore [reportPrivateUsage] -# Get the user group of the given file path. -def get_user_group(file_path) -> str: - return localplatform.localplatform._get_user_group(file_path) +# Get the user group of the given file path, or the user group hosting the plugin loader +def get_user_group(file_path: str | None = None) -> str: + return localplatform.localplatform._get_user_group(file_path) # pyright: ignore [reportPrivateUsage] # Get the group id of the user hosting the plugin loader def get_user_group_id() -> int: - return localplatform.localplatform._get_user_group_id() - -# Get the group of the user hosting the plugin loader -def get_user_group() -> str: - return localplatform.localplatform._get_user_group() + return localplatform.localplatform._get_user_group_id() # pyright: ignore [reportPrivateUsage] # Get the default home path unless a user is specified -def get_home_path(username = None) -> str: +def get_home_path(username: str | None = None) -> str: return localplatform.get_home_path(UserType.ROOT if username == "root" else UserType.HOST_USER) async def is_systemd_unit_active(unit_name: str) -> bool: diff --git a/backend/injector.py b/backend/injector.py index e3414fee..a217f689 100644 --- a/backend/injector.py +++ b/backend/injector.py @@ -2,10 +2,9 @@ from asyncio import sleep from logging import getLogger -from traceback import format_exc -from typing import List +from typing import Any, Callable, List, TypedDict, Dict -from aiohttp import ClientSession, WSMsgType +from aiohttp import ClientSession from aiohttp.client_exceptions import ClientConnectorError, ClientOSError from asyncio.exceptions import TimeoutError import uuid @@ -14,35 +13,43 @@ BASE_ADDRESS = "http://localhost:8080" logger = getLogger("Injector") +class _TabResponse(TypedDict): + title: str + id: str + url: str + webSocketDebuggerUrl: str class Tab: cmd_id = 0 - def __init__(self, res) -> None: - self.title = res["title"] - self.id = res["id"] - self.url = res["url"] - self.ws_url = res["webSocketDebuggerUrl"] + def __init__(self, res: _TabResponse) -> None: + self.title: str = res["title"] + self.id: str = res["id"] + self.url: str = res["url"] + self.ws_url: str = res["webSocketDebuggerUrl"] self.websocket = None self.client = None async def open_websocket(self): self.client = ClientSession() - self.websocket = await self.client.ws_connect(self.ws_url) + self.websocket = await self.client.ws_connect(self.ws_url) # type: ignore async def close_websocket(self): - await self.websocket.close() - await self.client.close() + if self.websocket: + await self.websocket.close() + if self.client: + await self.client.close() async def listen_for_message(self): - async for message in self.websocket: - data = message.json() - yield data - logger.warn(f"The Tab {self.title} socket has been disconnected while listening for messages.") - await self.close_websocket() + if self.websocket: + async for message in self.websocket: + data = message.json() + yield data + logger.warn(f"The Tab {self.title} socket has been disconnected while listening for messages.") + await self.close_websocket() - async def _send_devtools_cmd(self, dc, receive=True): + async def _send_devtools_cmd(self, dc: Dict[str, Any], receive: bool = True): if self.websocket: self.cmd_id += 1 dc["id"] = self.cmd_id @@ -54,7 +61,7 @@ class Tab: return None raise RuntimeError("Websocket not opened") - async def evaluate_js(self, js, run_async=False, manage_socket=True, get_result=True): + async def evaluate_js(self, js: str, run_async: bool | None = False, manage_socket: bool | None = True, get_result: bool = True): try: if manage_socket: await self.open_websocket() @@ -73,15 +80,16 @@ class Tab: await self.close_websocket() return res - async def has_global_var(self, var_name, manage_socket=True): + async def has_global_var(self, var_name: str, manage_socket: bool = True): res = await self.evaluate_js(f"window['{var_name}'] !== null && window['{var_name}'] !== undefined", False, manage_socket) + assert res is not None if not "result" in res or not "result" in res["result"] or not "value" in res["result"]["result"]: return False return res["result"]["result"]["value"] - async def close(self, manage_socket=True): + async def close(self, manage_socket: bool = True): try: if manage_socket: await self.open_websocket() @@ -111,7 +119,7 @@ class Tab: "method": "Page.disable", }, False) - async def refresh(self, manage_socket=True): + async def refresh(self, manage_socket: bool = True): try: if manage_socket: await self.open_websocket() @@ -125,7 +133,7 @@ class Tab: await self.close_websocket() return - async def reload_and_evaluate(self, js, manage_socket=True): + async def reload_and_evaluate(self, js: str, manage_socket: bool = True): """ Reloads the current tab, with JS to run on load via debugger """ @@ -153,11 +161,13 @@ class Tab: } }, True) + assert breakpoint_res is not None + logger.info(breakpoint_res) # Page finishes loading when breakpoint hits - for x in range(20): + for _ in range(20): # this works around 1/5 of the time, so just send it 8 times. # the js accounts for being injected multiple times allowing only one instance to run at a time anyway await self._send_devtools_cmd({ @@ -176,7 +186,7 @@ class Tab: } }, False) - for x in range(4): + for _ in range(4): await self._send_devtools_cmd({ "method": "Debugger.resume" }, False) @@ -190,7 +200,7 @@ class Tab: await self.close_websocket() return - async def add_script_to_evaluate_on_new_document(self, js, add_dom_wrapper=True, manage_socket=True, get_result=True): + async def add_script_to_evaluate_on_new_document(self, js: str, add_dom_wrapper: bool = True, manage_socket: bool = True, get_result: bool = True): """ How the underlying call functions is not particularly clear from the devtools docs, so stealing puppeteer's description: @@ -253,7 +263,7 @@ class Tab: await self.close_websocket() return res - async def remove_script_to_evaluate_on_new_document(self, script_id, manage_socket=True): + async def remove_script_to_evaluate_on_new_document(self, script_id: str, manage_socket: bool = True): """ Removes a script from a page that was added with `add_script_to_evaluate_on_new_document` @@ -267,7 +277,7 @@ class Tab: if manage_socket: await self.open_websocket() - res = await self._send_devtools_cmd({ + await self._send_devtools_cmd({ "method": "Page.removeScriptToEvaluateOnNewDocument", "params": { "identifier": script_id @@ -278,15 +288,16 @@ class Tab: if manage_socket: await self.close_websocket() - async def has_element(self, element_name, manage_socket=True): + async def has_element(self, element_name: str, manage_socket: bool = True): res = await self.evaluate_js(f"document.getElementById('{element_name}') != null", False, manage_socket) + assert res is not None if not "result" in res or not "result" in res["result"] or not "value" in res["result"]["result"]: return False return res["result"]["result"]["value"] - async def inject_css(self, style, manage_socket=True): + async def inject_css(self, style: str, manage_socket: bool = True): try: css_id = str(uuid.uuid4()) @@ -300,6 +311,8 @@ class Tab: }})() """, False, manage_socket) + assert result is not None + if "exceptionDetails" in result["result"]: return { "success": False, @@ -316,7 +329,7 @@ class Tab: "result": e } - async def remove_css(self, css_id, manage_socket=True): + async def remove_css(self, css_id: str, manage_socket: bool = True): try: result = await self.evaluate_js( f""" @@ -328,6 +341,8 @@ class Tab: }})() """, False, manage_socket) + assert result is not None + if "exceptionDetails" in result["result"]: return { "success": False, @@ -343,8 +358,9 @@ class Tab: "result": e } - async def get_steam_resource(self, url): + async def get_steam_resource(self, url: str): res = await self.evaluate_js(f'(async function test() {{ return await (await fetch("{url}")).text() }})()', True) + assert res is not None return res["result"]["result"]["value"] def __repr__(self): @@ -380,14 +396,14 @@ async def get_tabs() -> List[Tab]: raise Exception(f"/json did not return 200. {await res.text()}") -async def get_tab(tab_name) -> Tab: +async def get_tab(tab_name: str) -> Tab: tabs = await get_tabs() tab = next((i for i in tabs if i.title == tab_name), None) if not tab: raise ValueError(f"Tab {tab_name} not found") return tab -async def get_tab_lambda(test) -> Tab: +async def get_tab_lambda(test: Callable[[Tab], bool]) -> Tab: tabs = await get_tabs() tab = next((i for i in tabs if test(i)), None) if not tab: @@ -408,7 +424,7 @@ async def get_gamepadui_tab() -> Tab: raise ValueError(f"GamepadUI Tab not found") return tab -async def inject_to_tab(tab_name, js, run_async=False): +async def inject_to_tab(tab_name: str, js: str, run_async: bool = False): tab = await get_tab(tab_name) return await tab.evaluate_js(js, run_async) diff --git a/backend/loader.py b/backend/loader.py index d07b1c08..89bb0457 100644 --- a/backend/loader.py +++ b/backend/loader.py @@ -1,34 +1,40 @@ -from asyncio import Queue, sleep +from asyncio import AbstractEventLoop, Queue, sleep from json.decoder import JSONDecodeError from logging import getLogger from os import listdir, path from pathlib import Path from traceback import print_exc +from typing import Any, Tuple from aiohttp import web from os.path import exists -from watchdog.events import RegexMatchingEventHandler -from watchdog.observers import Observer +from watchdog.events import RegexMatchingEventHandler, DirCreatedEvent, DirModifiedEvent, FileCreatedEvent, FileModifiedEvent # type: ignore +from watchdog.observers import Observer # type: ignore -from injector import get_tab, get_gamepadui_tab -from plugin import PluginWrapper +from backend.main import PluginManager # type: ignore + +from .injector import get_tab, get_gamepadui_tab +from .plugin import PluginWrapper + +Plugins = dict[str, PluginWrapper] +ReloadQueue = Queue[Tuple[str, str, bool | None] | Tuple[str, str]] class FileChangeHandler(RegexMatchingEventHandler): - def __init__(self, queue, plugin_path) -> None: - super().__init__(regexes=[r'^.*?dist\/index\.js$', r'^.*?main\.py$']) + def __init__(self, queue: ReloadQueue, plugin_path: str) -> None: + super().__init__(regexes=[r'^.*?dist\/index\.js$', r'^.*?main\.py$']) # type: ignore self.logger = getLogger("file-watcher") self.plugin_path = plugin_path self.queue = queue self.disabled = True - def maybe_reload(self, src_path): + def maybe_reload(self, src_path: str): if self.disabled: return plugin_dir = Path(path.relpath(src_path, self.plugin_path)).parts[0] if exists(path.join(self.plugin_path, plugin_dir, "plugin.json")): self.queue.put_nowait((path.join(self.plugin_path, plugin_dir, "main.py"), plugin_dir, True)) - def on_created(self, event): + def on_created(self, event: DirCreatedEvent | FileCreatedEvent): src_path = event.src_path if "__pycache__" in src_path: return @@ -42,7 +48,7 @@ class FileChangeHandler(RegexMatchingEventHandler): self.logger.debug(f"file created: {src_path}") self.maybe_reload(src_path) - def on_modified(self, event): + def on_modified(self, event: DirModifiedEvent | FileModifiedEvent): src_path = event.src_path if "__pycache__" in src_path: return @@ -57,25 +63,25 @@ class FileChangeHandler(RegexMatchingEventHandler): self.maybe_reload(src_path) class Loader: - def __init__(self, server_instance, plugin_path, loop, live_reload=False) -> None: + def __init__(self, server_instance: PluginManager, plugin_path: str, loop: AbstractEventLoop, live_reload: bool =False) -> None: self.loop = loop self.logger = getLogger("Loader") self.plugin_path = plugin_path self.logger.info(f"plugin_path: {self.plugin_path}") - self.plugins : dict[str, PluginWrapper] = {} + self.plugins: Plugins = {} self.watcher = None self.live_reload = live_reload - self.reload_queue = Queue() + self.reload_queue: ReloadQueue = Queue() self.loop.create_task(self.handle_reloads()) if live_reload: self.observer = Observer() self.watcher = FileChangeHandler(self.reload_queue, plugin_path) - self.observer.schedule(self.watcher, self.plugin_path, recursive=True) + self.observer.schedule(self.watcher, self.plugin_path, recursive=True) # type: ignore self.observer.start() self.loop.create_task(self.enable_reload_wait()) - server_instance.add_routes([ + server_instance.web_app.add_routes([ web.get("/frontend/{path:.*}", self.handle_frontend_assets), web.get("/locales/{path:.*}", self.handle_frontend_locales), web.get("/plugins", self.get_plugins), @@ -93,15 +99,16 @@ class Loader: async def enable_reload_wait(self): if self.live_reload: await sleep(10) - self.logger.info("Hot reload enabled") - self.watcher.disabled = False + if self.watcher: + self.logger.info("Hot reload enabled") + self.watcher.disabled = False - async def handle_frontend_assets(self, request): + async def handle_frontend_assets(self, request: web.Request): file = path.join(path.dirname(__file__), "static", request.match_info["path"]) return web.FileResponse(file, headers={"Cache-Control": "no-cache"}) - async def handle_frontend_locales(self, request): + async def handle_frontend_locales(self, request: web.Request): req_lang = request.match_info["path"] file = path.join(path.dirname(__file__), "locales", req_lang) if exists(file): @@ -110,23 +117,23 @@ class Loader: self.logger.info(f"Language {req_lang} not available, returning an empty dictionary") return web.json_response(data={}, headers={"Cache-Control": "no-cache"}) - async def get_plugins(self, request): + async def get_plugins(self, request: web.Request): plugins = list(self.plugins.values()) return web.json_response([{"name": str(i) if not i.legacy else "$LEGACY_"+str(i), "version": i.version} for i in plugins]) - def handle_plugin_frontend_assets(self, request): + async def handle_plugin_frontend_assets(self, request: web.Request): plugin = self.plugins[request.match_info["plugin_name"]] file = path.join(self.plugin_path, plugin.plugin_directory, "dist/assets", request.match_info["path"]) return web.FileResponse(file, headers={"Cache-Control": "no-cache"}) - def handle_frontend_bundle(self, request): + async def handle_frontend_bundle(self, request: web.Request): plugin = self.plugins[request.match_info["plugin_name"]] with open(path.join(self.plugin_path, plugin.plugin_directory, "dist/index.js"), "r", encoding="utf-8") as bundle: return web.Response(text=bundle.read(), content_type="application/javascript") - def import_plugin(self, file, plugin_directory, refresh=False, batch=False): + def import_plugin(self, file: str, plugin_directory: str, refresh: bool | None = False, batch: bool | None = False): try: plugin = PluginWrapper(file, plugin_directory, self.plugin_path) if plugin.name in self.plugins: @@ -146,7 +153,7 @@ class Loader: self.logger.error(f"Could not load {file}. {e}") print_exc() - async def dispatch_plugin(self, name, version): + async def dispatch_plugin(self, name: str, version: str | None): gpui_tab = await get_gamepadui_tab() await gpui_tab.evaluate_js(f"window.importDeckyPlugin('{name}', '{version}')") @@ -161,15 +168,15 @@ class Loader: async def handle_reloads(self): while True: args = await self.reload_queue.get() - self.import_plugin(*args) + self.import_plugin(*args) # type: ignore - async def handle_plugin_method_call(self, request): + async def handle_plugin_method_call(self, request: web.Request): res = {} plugin = self.plugins[request.match_info["plugin_name"]] method_name = request.match_info["method_name"] try: method_info = await request.json() - args = method_info["args"] + args: Any = method_info["args"] except JSONDecodeError: args = {} try: @@ -189,7 +196,7 @@ class Loader: can introduce it more smoothly and give people the chance to sample the new features even without plugin support. They will be removed once legacy plugins are no longer relevant. """ - async def load_plugin_main_view(self, request): + async def load_plugin_main_view(self, request: web.Request): plugin = self.plugins[request.match_info["name"]] with open(path.join(self.plugin_path, plugin.plugin_directory, plugin.main_view_html), "r", encoding="utf-8") as template: template_data = template.read() @@ -201,7 +208,7 @@ class Loader: """ return web.Response(text=ret, content_type="text/html") - async def handle_sub_route(self, request): + async def handle_sub_route(self, request: web.Request): plugin = self.plugins[request.match_info["name"]] route_path = request.match_info["path"] self.logger.info(path) @@ -212,14 +219,14 @@ class Loader: return web.Response(text=ret) - async def get_steam_resource(self, request): + async def get_steam_resource(self, request: web.Request): tab = await get_tab("SP") try: return web.Response(text=await tab.get_steam_resource(f"https://steamloopback.host/{request.match_info['path']}"), content_type="text/html") except Exception as e: return web.Response(text=str(e), status=400) - async def handle_backend_reload_request(self, request): + async def handle_backend_reload_request(self, request: web.Request): plugin_name : str = request.match_info["plugin_name"] plugin = self.plugins[plugin_name] diff --git a/backend/localplatformlinux.py b/backend/localplatformlinux.py index 811db8a6..58b9dbc2 100644 --- a/backend/localplatformlinux.py +++ b/backend/localplatformlinux.py @@ -29,21 +29,17 @@ def _get_effective_user_group() -> str: return grp.getgrgid(_get_effective_user_group_id()).gr_name # Get the user owner of the given file path. -def _get_user_owner(file_path) -> str: +def _get_user_owner(file_path: str) -> str: return pwd.getpwuid(os.stat(file_path).st_uid).pw_name -# Get the user group of the given file path. -def _get_user_group(file_path) -> str: - return grp.getgrgid(os.stat(file_path).st_gid).gr_name +# Get the user group of the given file path, or the user group hosting the plugin loader +def _get_user_group(file_path: str | None = None) -> str: + return grp.getgrgid(os.stat(file_path).st_gid if file_path is not None else _get_user_group_id()).gr_name # Get the group id of the user hosting the plugin loader def _get_user_group_id() -> int: return pwd.getpwuid(_get_user_id()).pw_gid -# Get the group of the user hosting the plugin loader -def _get_user_group() -> str: - return grp.getgrgid(_get_user_group_id()).gr_name - def chown(path : str, user : UserType = UserType.HOST_USER, recursive : bool = True) -> bool: user_str = "" @@ -146,7 +142,7 @@ def get_privileged_path() -> str: return path -def _parent_dir(path : str) -> str: +def _parent_dir(path : str | None) -> str | None: if path == None: return None @@ -166,7 +162,7 @@ def get_unprivileged_path() -> str: # Expected path of loader binary is /home/deck/homebrew/service/PluginLoader path = _parent_dir(_parent_dir(os.path.realpath(sys.argv[0]))) - if not os.path.exists(path): + if path != None and not os.path.exists(path): path = None if path == None: diff --git a/backend/main.py b/backend/main.py index ae11b066..433b202f 100644 --- a/backend/main.py +++ b/backend/main.py @@ -7,16 +7,16 @@ from localplatform import (chmod, chown, service_stop, service_start, if hasattr(sys, '_MEIPASS'): chmod(sys._MEIPASS, 755) # type: ignore # Full imports -from asyncio import new_event_loop, set_event_loop, sleep +from asyncio import AbstractEventLoop, new_event_loop, set_event_loop, sleep from logging import basicConfig, getLogger from os import path from traceback import format_exc import multiprocessing -import aiohttp_cors +import aiohttp_cors # type: ignore # Partial imports from aiohttp import client_exceptions -from aiohttp.web import Application, Response, get, run_app, static +from aiohttp.web import Application, Response, get, run_app, static # type: ignore from aiohttp_jinja2 import setup as jinja_setup # local modules @@ -51,7 +51,7 @@ if get_chown_plugin_path() == True: chown_plugin_dir() class PluginManager: - def __init__(self, loop) -> None: + def __init__(self, loop: AbstractEventLoop) -> None: self.loop = loop self.web_app = Application() self.web_app.middlewares.append(csrf_middleware) @@ -62,7 +62,7 @@ class PluginManager: allow_credentials=True ) }) - self.plugin_loader = Loader(self.web_app, plugin_path, self.loop, get_live_reload()) + self.plugin_loader = Loader(self, plugin_path, self.loop, get_live_reload()) self.settings = SettingsManager("loader", path.join(get_privileged_path(), "settings")) self.plugin_browser = PluginBrowser(plugin_path, self.plugin_loader.plugins, self.plugin_loader, self.settings) self.utilities = Utilities(self) diff --git a/backend/plugin.py b/backend/plugin.py index 026a6b09..781d9f7b 100644 --- a/backend/plugin.py +++ b/backend/plugin.py @@ -1,7 +1,6 @@ import multiprocessing from asyncio import (Lock, get_event_loop, new_event_loop, set_event_loop, sleep) -from concurrent.futures import ProcessPoolExecutor from importlib.util import module_from_spec, spec_from_file_location from json import dumps, load, loads from logging import getLogger @@ -9,14 +8,14 @@ from traceback import format_exc from os import path, environ from signal import SIGINT, signal from sys import exit, path as syspath -from time import time +from typing import Any, Dict from localsocket import LocalSocket from localplatform import setgid, setuid, get_username, get_home_path from customtypes import UserType import helpers class PluginWrapper: - def __init__(self, file, plugin_directory, plugin_path) -> None: + def __init__(self, file: str, plugin_directory: str, plugin_path: str) -> None: self.file = file self.plugin_path = plugin_path self.plugin_directory = plugin_directory @@ -73,14 +72,17 @@ class PluginWrapper: helpers.mkdir_as_user(environ["DECKY_PLUGIN_LOG_DIR"]) environ["DECKY_PLUGIN_DIR"] = path.join(self.plugin_path, self.plugin_directory) environ["DECKY_PLUGIN_NAME"] = self.name - environ["DECKY_PLUGIN_VERSION"] = self.version + if self.version: + environ["DECKY_PLUGIN_VERSION"] = self.version environ["DECKY_PLUGIN_AUTHOR"] = self.author # append the plugin's `py_modules` to the recognized python paths syspath.append(path.join(environ["DECKY_PLUGIN_DIR"], "py_modules")) spec = spec_from_file_location("_", self.file) + assert spec is not None module = module_from_spec(spec) + assert spec.loader is not None spec.loader.exec_module(module) self.Plugin = module.Plugin @@ -118,7 +120,8 @@ class PluginWrapper: get_event_loop().close() raise Exception("Closing message listener") - d = {"res": None, "success": True} + # TODO there is definitely a better way to type this + d: Dict[str, Any] = {"res": None, "success": True} try: d["res"] = await getattr(self.Plugin, data["method"])(self.Plugin, **data["args"]) except Exception as e: @@ -137,17 +140,18 @@ class PluginWrapper: if self.passive: return - async def _(self): + async def _(self: PluginWrapper): await self.socket.write_single_line(dumps({ "stop": True }, ensure_ascii=False)) await self.socket.close_socket_connection() get_event_loop().create_task(_(self)) - async def execute_method(self, method_name, kwargs): + async def execute_method(self, method_name: str, kwargs: Dict[Any, Any]): if self.passive: raise RuntimeError("This plugin is passive (aka does not implement main.py)") async with self.method_call_lock: - reader, writer = await self.socket.get_socket_connection() + # reader, writer = + await self.socket.get_socket_connection() await self.socket.write_single_line(dumps({ "method": method_name, "args": kwargs }, ensure_ascii=False)) diff --git a/backend/settings.py b/backend/settings.py index c00e6a82..26dfc97f 100644 --- a/backend/settings.py +++ b/backend/settings.py @@ -1,5 +1,6 @@ from json import dump, load from os import mkdir, path, listdir, rename +from typing import Any, Dict from localplatform import chown, folder_owner, get_chown_plugin_path from customtypes import UserType @@ -7,7 +8,7 @@ from helpers import get_homebrew_path class SettingsManager: - def __init__(self, name, settings_directory = None) -> None: + def __init__(self, name: str, settings_directory: str | None = None) -> None: wrong_dir = get_homebrew_path() if settings_directory == None: settings_directory = path.join(wrong_dir, "settings") @@ -31,11 +32,11 @@ class SettingsManager: if folder_owner(settings_directory) != expected_user: chown(settings_directory, expected_user, False) - self.settings = {} + self.settings: Dict[str, Any] = {} try: open(self.path, "x", encoding="utf-8") - except FileExistsError as e: + except FileExistsError as _: self.read() pass @@ -51,9 +52,9 @@ class SettingsManager: with open(self.path, "w+", encoding="utf-8") as file: dump(self.settings, file, indent=4, ensure_ascii=False) - def getSetting(self, key, default=None): + def getSetting(self, key: str, default: Any = None) -> Any: return self.settings.get(key, default) - def setSetting(self, key, value): + def setSetting(self, key: str, value: Any) -> Any: self.settings[key] = value self.commit() diff --git a/backend/utilities.py b/backend/utilities.py index bcb35578..72b6f008 100644 --- a/backend/utilities.py +++ b/backend/utilities.py @@ -1,26 +1,27 @@ import uuid -import os from json.decoder import JSONDecodeError from os.path import splitext import re from traceback import format_exc -from stat import FILE_ATTRIBUTE_HIDDEN +from stat import FILE_ATTRIBUTE_HIDDEN # type: ignore -from asyncio import sleep, start_server, gather, open_connection +from asyncio import start_server, gather, open_connection from aiohttp import ClientSession, web +from typing import Dict from logging import getLogger +from backend.browser import PluginInstallType +from backend.main import PluginManager from injector import inject_to_tab, get_gamepadui_tab, close_old_tabs, get_tab from pathlib import Path from localplatform import ON_WINDOWS import helpers -import subprocess from localplatform import service_stop, service_start, get_home_path, get_username class Utilities: - def __init__(self, context) -> None: + def __init__(self, context: PluginManager) -> None: self.context = context - self.util_methods = { + self.util_methods: Dict[] = { "ping": self.ping, "http_request": self.http_request, "install_plugin": self.install_plugin, @@ -69,7 +70,7 @@ class Utilities: res["success"] = False return web.json_response(res) - async def install_plugin(self, artifact="", name="No name", version="dev", hash=False, install_type=0): + async def install_plugin(self, artifact="", name="No name", version="dev", hash=False, install_type=PluginInstallType.INSTALL): return await self.context.plugin_browser.request_plugin_install( artifact=artifact, name=name, |
