from __future__ import annotations from os import stat_result, listdir import uuid from json.decoder import JSONDecodeError from os.path import splitext import re from traceback import format_exc from stat import FILE_ATTRIBUTE_HIDDEN # type: ignore from asyncio import StreamReader, StreamWriter, start_server, gather, open_connection from aiohttp import ClientSession, web from typing import TYPE_CHECKING, Callable, Coroutine, Dict, Any, List, TypedDict from logging import getLogger from pathlib import Path from .browser import PluginInstallRequest, PluginInstallType if TYPE_CHECKING: from .main import PluginManager from .injector import inject_to_tab, get_gamepadui_tab, close_old_tabs, get_tab from .localplatform import ON_WINDOWS from . import helpers from .localplatform import service_stop, service_start, get_home_path, get_username class FilePickerObj(TypedDict): file: Path filest: stat_result is_dir: bool class Utilities: def __init__(self, context: PluginManager) -> None: self.context = context self.util_methods: Dict[str, Callable[..., Coroutine[Any, Any, Any]]] = { "ping": self.ping, "http_request": self.http_request, "install_plugin": self.install_plugin, "install_plugins": self.install_plugins, "cancel_plugin_install": self.cancel_plugin_install, "confirm_plugin_install": self.confirm_plugin_install, "uninstall_plugin": self.uninstall_plugin, "execute_in_tab": self.execute_in_tab, "inject_css_into_tab": self.inject_css_into_tab, "remove_css_from_tab": self.remove_css_from_tab, "allow_remote_debugging": self.allow_remote_debugging, "disallow_remote_debugging": self.disallow_remote_debugging, "set_setting": self.set_setting, "get_setting": self.get_setting, "filepicker_ls": self.filepicker_ls, "disable_rdt": self.disable_rdt, "enable_rdt": self.enable_rdt, "get_tab_id": self.get_tab_id, "get_user_info": self.get_user_info, "get_plugin_logs": self.get_plugin_logs, "get_plugins_with_logs": self.get_plugins_with_logs, "get_plugin_log_text": self.get_plugin_log_text, "upload_log": self.upload_log } self.logger = getLogger("Utilities") self.rdt_proxy_server = None self.rdt_script_id = None self.rdt_proxy_task = None if context: context.web_app.add_routes( [web.post("/methods/{method_name}", self._handle_server_method_call)] ) async def _handle_server_method_call(self, request: web.Request): method_name = request.match_info["method_name"] try: args = await request.json() except JSONDecodeError: args = {} res = {} try: r = await self.util_methods[method_name](**args) res["result"] = r res["success"] = True except Exception as e: res["result"] = str(e) res["success"] = False return web.json_response(res) async def install_plugin( self, artifact: str = "", name: str = "No name", version: str = "dev", hash: str = "", install_type: PluginInstallType = PluginInstallType.INSTALL, ): return await self.context.plugin_browser.request_plugin_install( artifact=artifact, name=name, version=version, hash=hash, install_type=install_type, ) async def install_plugins(self, requests: List[PluginInstallRequest]): return await self.context.plugin_browser.request_multiple_plugin_installs( requests=requests ) async def confirm_plugin_install(self, request_id: str): return await self.context.plugin_browser.confirm_plugin_install(request_id) async def cancel_plugin_install(self, request_id: str): return self.context.plugin_browser.cancel_plugin_install(request_id) async def uninstall_plugin(self, name: str): return await self.context.plugin_browser.uninstall_plugin(name) async def http_request(self, method: str = "", url: str = "", **kwargs: Any): async with ClientSession() as web: res = await web.request( method, url, ssl=helpers.get_ssl_context(), **kwargs ) text = await res.text() return {"status": res.status, "headers": dict(res.headers), "body": text} async def ping(self, **kwargs: Any): return "pong" async def execute_in_tab(self, tab: str, run_async: bool, code: str): try: result = await inject_to_tab(tab, code, run_async) assert result if "exceptionDetails" in result["result"]: return {"success": False, "result": result["result"]} return {"success": True, "result": result["result"]["result"].get("value")} except Exception as e: return {"success": False, "result": e} async def inject_css_into_tab(self, tab: str, style: str): try: css_id = str(uuid.uuid4()) result = await inject_to_tab( tab, f""" (function() {{ const style = document.createElement('style'); style.id = "{css_id}"; document.head.append(style); style.textContent = `{style}`; }})() """, False, ) if result and "exceptionDetails" in result["result"]: return {"success": False, "result": result["result"]} return {"success": True, "result": css_id} except Exception as e: return {"success": False, "result": e} async def remove_css_from_tab(self, tab: str, css_id: str): try: result = await inject_to_tab( tab, f""" (function() {{ let style = document.getElementById("{css_id}"); if (style.nodeName.toLowerCase() == 'style') style.parentNode.removeChild(style); }})() """, False, ) if result and "exceptionDetails" in result["result"]: return {"success": False, "result": result} return {"success": True} except Exception as e: return {"success": False, "result": e} async def get_setting(self, key: str, default: Any): return self.context.settings.getSetting(key, default) async def set_setting(self, key: str, value: Any): return self.context.settings.setSetting(key, value) async def allow_remote_debugging(self): await service_start(helpers.REMOTE_DEBUGGER_UNIT) return True async def disallow_remote_debugging(self): await service_stop(helpers.REMOTE_DEBUGGER_UNIT) return True async def filepicker_ls( self, path: str | None = None, include_files: bool = True, include_folders: bool = True, include_ext: list[str] = [], include_hidden: bool = False, order_by: str = "name_asc", filter_for: str | None = None, page: int = 1, max: int = 1000, ): if path == None: path = get_home_path() path_obj = Path(path).resolve() files: List[FilePickerObj] = [] folders: List[FilePickerObj] = [] # Resolving all files/folders in the requested directory for file in path_obj.iterdir(): if file.exists(): filest = file.stat() is_hidden = file.name.startswith(".") if ON_WINDOWS and not is_hidden: is_hidden = bool(filest.st_file_attributes & FILE_ATTRIBUTE_HIDDEN) # type: ignore if include_folders and file.is_dir(): if (is_hidden and include_hidden) or not is_hidden: folders.append({"file": file, "filest": filest, "is_dir": True}) elif include_files: # Handle requested extensions if present if len(include_ext) == 0 or 'all_files' in include_ext \ or splitext(file.name)[1].lstrip('.').upper() in (ext.upper() for ext in include_ext): if (is_hidden and include_hidden) or not is_hidden: files.append( {"file": file, "filest": filest, "is_dir": False} ) # Filter logic if filter_for is not None: try: if re.compile(filter_for): files = list( filter( lambda file: re.search(filter_for, file["file"].name) != None, files, ) ) except re.error: files = list( filter(lambda file: file["file"].name.find(filter_for) != -1, files) ) # Ordering logic ord_arg = order_by.split("_") ord = ord_arg[0] rev = True if ord_arg[1] == "asc" else False match ord: case "name": files.sort(key=lambda x: x["file"].name.casefold(), reverse=rev) folders.sort(key=lambda x: x["file"].name.casefold(), reverse=rev) case "modified": files.sort(key=lambda x: x["filest"].st_mtime, reverse=not rev) folders.sort(key=lambda x: x["filest"].st_mtime, reverse=not rev) case "created": files.sort(key=lambda x: x["filest"].st_ctime, reverse=not rev) folders.sort(key=lambda x: x["filest"].st_ctime, reverse=not rev) case "size": files.sort(key=lambda x: x["filest"].st_size, reverse=not rev) # Folders has no file size, order by name instead folders.sort(key=lambda x: x["file"].name.casefold()) case _: files.sort(key=lambda x: x["file"].name.casefold(), reverse=rev) folders.sort(key=lambda x: x["file"].name.casefold(), reverse=rev) # Constructing the final file list, folders first all = [ { "isdir": x["is_dir"], "name": str(x["file"].name), "realpath": str(x["file"]), "size": x["filest"].st_size, "modified": x["filest"].st_mtime, "created": x["filest"].st_ctime, } for x in folders + files ] return { "realpath": str(path), "files": all[(page - 1) * max : (page) * max], "total": len(all), } # Based on https://stackoverflow.com/a/46422554/13174603 def start_rdt_proxy(self, ip: str, port: int): async def pipe(reader: StreamReader, writer: StreamWriter): try: while not reader.at_eof(): writer.write(await reader.read(2048)) finally: writer.close() async def handle_client(local_reader: StreamReader, local_writer: StreamWriter): try: remote_reader, remote_writer = await open_connection(ip, port) pipe1 = pipe(local_reader, remote_writer) pipe2 = pipe(remote_reader, local_writer) await gather(pipe1, pipe2) finally: local_writer.close() self.rdt_proxy_server = start_server(handle_client, "127.0.0.1", port) self.rdt_proxy_task = self.context.loop.create_task(self.rdt_proxy_server) def stop_rdt_proxy(self): if self.rdt_proxy_server != None: self.rdt_proxy_server.close() if self.rdt_proxy_task: self.rdt_proxy_task.cancel() async def _enable_rdt(self): # TODO un-hardcode port try: self.stop_rdt_proxy() ip = self.context.settings.getSetting("developer.rdt.ip", None) if ip != None: self.logger.info("Connecting to React DevTools at " + ip) async with ClientSession() as web: res = await web.request( "GET", "http://" + ip + ":8097", ssl=helpers.get_ssl_context() ) script = ( """ if (!window.deckyHasConnectedRDT) { window.deckyHasConnectedRDT = true; // This fixes the overlay when hovering over an element in RDT Object.defineProperty(window, '__REACT_DEVTOOLS_TARGET_WINDOW__', { enumerable: true, configurable: true, get: function() { return (GamepadNavTree?.m_context?.m_controller || FocusNavController)?.m_ActiveContext?.ActiveWindow || window; } }); """ + await res.text() + "\n}" ) if res.status != 200: self.logger.error("Failed to connect to React DevTools at " + ip) return False self.start_rdt_proxy(ip, 8097) self.logger.info("Connected to React DevTools, loading script") tab = await get_gamepadui_tab() # RDT needs to load before React itself to work. await close_old_tabs() result = await tab.reload_and_evaluate(script) self.logger.info(result) except Exception: self.logger.error("Failed to connect to React DevTools") self.logger.error(format_exc()) async def enable_rdt(self): self.context.loop.create_task(self._enable_rdt()) async def disable_rdt(self): self.logger.info("Disabling React DevTools") tab = await get_gamepadui_tab() self.rdt_script_id = None await close_old_tabs() await tab.evaluate_js("location.reload();", False, True, False) self.logger.info("React DevTools disabled") async def get_user_info(self) -> Dict[str, str]: return {"username": get_username(), "path": get_home_path()} async def get_tab_id(self, name: str): return (await get_tab(name)).id async def get_plugins_with_logs(self): return list(self.context.plugin_loader.plugins.keys()) async def get_plugin_logs(self, plugin_name: str): log_path = Path(helpers.get_homebrew_path()) / "logs" / \ self.context.plugin_loader.plugins[plugin_name].plugin_directory return listdir(log_path) async def get_plugin_log_text(self, plugin_name: str, log_name: str): log_path = Path(helpers.get_homebrew_path()) / "logs" / \ self.context.plugin_loader.plugins[plugin_name].plugin_directory / log_name with open(log_path, "r") as log_file: return log_file.read() async def upload_log(self, plugin_name: str, log_name: str): text = await self.get_plugin_log_text(plugin_name, log_name) async with ClientSession() as web: res = await web.put("https://lp.deckbrew.xyz", data=text) res.raise_for_status() upload_id = (await res.json())["id"] return f"https://lp.deckbrew.xyz/{upload_id}"