summaryrefslogtreecommitdiff
path: root/backend/src
diff options
context:
space:
mode:
Diffstat (limited to 'backend/src')
-rw-r--r--backend/src/utilities.py244
1 files changed, 135 insertions, 109 deletions
diff --git a/backend/src/utilities.py b/backend/src/utilities.py
index edde682d..92715645 100644
--- a/backend/src/utilities.py
+++ b/backend/src/utilities.py
@@ -1,11 +1,11 @@
from __future__ import annotations
-from os import stat_result
+from os import stat_result, listdir
import uuid
from json.decoder import JSONDecodeError
from os.path import splitext
import re
from traceback import format_exc
-from stat import FILE_ATTRIBUTE_HIDDEN # type: ignore
+from stat import FILE_ATTRIBUTE_HIDDEN # type: ignore
from asyncio import StreamReader, StreamWriter, start_server, gather, open_connection
from aiohttp import ClientSession, web
@@ -15,6 +15,7 @@ from logging import getLogger
from pathlib import Path
from .browser import PluginInstallRequest, PluginInstallType
+
if TYPE_CHECKING:
from .main import PluginManager
from .injector import inject_to_tab, get_gamepadui_tab, close_old_tabs, get_tab
@@ -22,11 +23,13 @@ from .localplatform import ON_WINDOWS
from . import helpers
from .localplatform import service_stop, service_start, get_home_path, get_username
+
class FilePickerObj(TypedDict):
file: Path
filest: stat_result
is_dir: bool
+
class Utilities:
def __init__(self, context: PluginManager) -> None:
self.context = context
@@ -50,6 +53,10 @@ class Utilities:
"enable_rdt": self.enable_rdt,
"get_tab_id": self.get_tab_id,
"get_user_info": self.get_user_info,
+ "get_plugin_logs": self.get_plugin_logs,
+ "get_plugins_with_logs": self.get_plugins_with_logs,
+ "get_plugin_log_text": self.get_plugin_log_text,
+ "upload_log": self.upload_log
}
self.logger = getLogger("Utilities")
@@ -59,9 +66,9 @@ class Utilities:
self.rdt_proxy_task = None
if context:
- context.web_app.add_routes([
- web.post("/methods/{method_name}", self._handle_server_method_call)
- ])
+ context.web_app.add_routes(
+ [web.post("/methods/{method_name}", self._handle_server_method_call)]
+ )
async def _handle_server_method_call(self, request: web.Request):
method_name = request.match_info["method_name"]
@@ -79,13 +86,20 @@ class Utilities:
res["success"] = False
return web.json_response(res)
- async def install_plugin(self, artifact: str="", name: str="No name", version: str="dev", hash: str="", install_type: PluginInstallType=PluginInstallType.INSTALL):
+ async def install_plugin(
+ self,
+ artifact: str = "",
+ name: str = "No name",
+ version: str = "dev",
+ hash: str = "",
+ install_type: PluginInstallType = PluginInstallType.INSTALL,
+ ):
return await self.context.plugin_browser.request_plugin_install(
artifact=artifact,
name=name,
version=version,
hash=hash,
- install_type=install_type
+ install_type=install_type,
)
async def install_plugins(self, requests: List[PluginInstallRequest]):
@@ -102,15 +116,13 @@ class Utilities:
async def uninstall_plugin(self, name: str):
return await self.context.plugin_browser.uninstall_plugin(name)
- async def http_request(self, method: str="", url: str="", **kwargs: Any):
+ async def http_request(self, method: str = "", url: str = "", **kwargs: Any):
async with ClientSession() as web:
- res = await web.request(method, url, ssl=helpers.get_ssl_context(), **kwargs)
+ res = await web.request(
+ method, url, ssl=helpers.get_ssl_context(), **kwargs
+ )
text = await res.text()
- return {
- "status": res.status,
- "headers": dict(res.headers),
- "body": text
- }
+ return {"status": res.status, "headers": dict(res.headers), "body": text}
async def ping(self, **kwargs: Any):
return "pong"
@@ -120,26 +132,18 @@ class Utilities:
result = await inject_to_tab(tab, code, run_async)
assert result
if "exceptionDetails" in result["result"]:
- return {
- "success": False,
- "result": result["result"]
- }
-
- return {
- "success": True,
- "result": result["result"]["result"].get("value")
- }
+ return {"success": False, "result": result["result"]}
+
+ return {"success": True, "result": result["result"]["result"].get("value")}
except Exception as e:
- return {
- "success": False,
- "result": e
- }
+ return {"success": False, "result": e}
async def inject_css_into_tab(self, tab: str, style: str):
try:
css_id = str(uuid.uuid4())
- result = await inject_to_tab(tab,
+ result = await inject_to_tab(
+ tab,
f"""
(function() {{
const style = document.createElement('style');
@@ -147,27 +151,21 @@ class Utilities:
document.head.append(style);
style.textContent = `{style}`;
}})()
- """, False)
+ """,
+ False,
+ )
if result and "exceptionDetails" in result["result"]:
- return {
- "success": False,
- "result": result["result"]
- }
-
- return {
- "success": True,
- "result": css_id
- }
+ return {"success": False, "result": result["result"]}
+
+ return {"success": True, "result": css_id}
except Exception as e:
- return {
- "success": False,
- "result": e
- }
+ return {"success": False, "result": e}
async def remove_css_from_tab(self, tab: str, css_id: str):
try:
- result = await inject_to_tab(tab,
+ result = await inject_to_tab(
+ tab,
f"""
(function() {{
let style = document.getElementById("{css_id}");
@@ -175,22 +173,16 @@ class Utilities:
if (style.nodeName.toLowerCase() == 'style')
style.parentNode.removeChild(style);
}})()
- """, False)
+ """,
+ False,
+ )
if result and "exceptionDetails" in result["result"]:
- return {
- "success": False,
- "result": result
- }
+ return {"success": False, "result": result}
- return {
- "success": True
- }
+ return {"success": True}
except Exception as e:
- return {
- "success": False,
- "result": e
- }
+ return {"success": False, "result": e}
async def get_setting(self, key: str, default: Any):
return self.context.settings.getSetting(key, default)
@@ -206,17 +198,19 @@ class Utilities:
await service_stop(helpers.REMOTE_DEBUGGER_UNIT)
return True
- async def filepicker_ls(self,
- path : str | None = None,
- include_files: bool = True,
- include_folders: bool = True,
- include_ext: list[str] = [],
- include_hidden: bool = False,
- order_by: str = "name_asc",
- filter_for: str | None = None,
- page: int = 1,
- max: int = 1000):
-
+ async def filepicker_ls(
+ self,
+ path: str | None = None,
+ include_files: bool = True,
+ include_folders: bool = True,
+ include_ext: list[str] = [],
+ include_hidden: bool = False,
+ order_by: str = "name_asc",
+ filter_for: str | None = None,
+ page: int = 1,
+ max: int = 1000,
+ ):
+
if path == None:
path = get_home_path()
@@ -225,13 +219,13 @@ class Utilities:
files: List[FilePickerObj] = []
folders: List[FilePickerObj] = []
- #Resolving all files/folders in the requested directory
+ # Resolving all files/folders in the requested directory
for file in path_obj.iterdir():
if file.exists():
filest = file.stat()
- is_hidden = file.name.startswith('.')
+ is_hidden = file.name.startswith(".")
if ON_WINDOWS and not is_hidden:
- is_hidden = bool(filest.st_file_attributes & FILE_ATTRIBUTE_HIDDEN) # type: ignore
+ is_hidden = bool(filest.st_file_attributes & FILE_ATTRIBUTE_HIDDEN) # type: ignore
if include_folders and file.is_dir():
if (is_hidden and include_hidden) or not is_hidden:
folders.append({"file": file, "filest": filest, "is_dir": True})
@@ -240,53 +234,65 @@ class Utilities:
if len(include_ext) == 0 or 'all_files' in include_ext \
or splitext(file.name)[1].lstrip('.').upper() in (ext.upper() for ext in include_ext):
if (is_hidden and include_hidden) or not is_hidden:
- files.append({"file": file, "filest": filest, "is_dir": False})
+ files.append(
+ {"file": file, "filest": filest, "is_dir": False}
+ )
# Filter logic
if filter_for is not None:
try:
if re.compile(filter_for):
- files = list(filter(lambda file: re.search(filter_for, file["file"].name) != None, files))
+ files = list(
+ filter(
+ lambda file: re.search(filter_for, file["file"].name)
+ != None,
+ files,
+ )
+ )
except re.error:
- files = list(filter(lambda file: file["file"].name.find(filter_for) != -1, files))
-
+ files = list(
+ filter(lambda file: file["file"].name.find(filter_for) != -1, files)
+ )
+
# Ordering logic
ord_arg = order_by.split("_")
ord = ord_arg[0]
rev = True if ord_arg[1] == "asc" else False
match ord:
- case 'name':
- files.sort(key=lambda x: x['file'].name.casefold(), reverse = rev)
- folders.sort(key=lambda x: x['file'].name.casefold(), reverse = rev)
- case 'modified':
- files.sort(key=lambda x: x['filest'].st_mtime, reverse = not rev)
- folders.sort(key=lambda x: x['filest'].st_mtime, reverse = not rev)
- case 'created':
- files.sort(key=lambda x: x['filest'].st_ctime, reverse = not rev)
- folders.sort(key=lambda x: x['filest'].st_ctime, reverse = not rev)
- case 'size':
- files.sort(key=lambda x: x['filest'].st_size, reverse = not rev)
+ case "name":
+ files.sort(key=lambda x: x["file"].name.casefold(), reverse=rev)
+ folders.sort(key=lambda x: x["file"].name.casefold(), reverse=rev)
+ case "modified":
+ files.sort(key=lambda x: x["filest"].st_mtime, reverse=not rev)
+ folders.sort(key=lambda x: x["filest"].st_mtime, reverse=not rev)
+ case "created":
+ files.sort(key=lambda x: x["filest"].st_ctime, reverse=not rev)
+ folders.sort(key=lambda x: x["filest"].st_ctime, reverse=not rev)
+ case "size":
+ files.sort(key=lambda x: x["filest"].st_size, reverse=not rev)
# Folders has no file size, order by name instead
- folders.sort(key=lambda x: x['file'].name.casefold())
+ folders.sort(key=lambda x: x["file"].name.casefold())
case _:
- files.sort(key=lambda x: x['file'].name.casefold(), reverse = rev)
- folders.sort(key=lambda x: x['file'].name.casefold(), reverse = rev)
-
- #Constructing the final file list, folders first
- all = [{
- "isdir": x['is_dir'],
- "name": str(x['file'].name),
- "realpath": str(x['file']),
- "size": x['filest'].st_size,
- "modified": x['filest'].st_mtime,
- "created": x['filest'].st_ctime,
- } for x in folders + files ]
+ files.sort(key=lambda x: x["file"].name.casefold(), reverse=rev)
+ folders.sort(key=lambda x: x["file"].name.casefold(), reverse=rev)
+
+ # Constructing the final file list, folders first
+ all = [
+ {
+ "isdir": x["is_dir"],
+ "name": str(x["file"].name),
+ "realpath": str(x["file"]),
+ "size": x["filest"].st_size,
+ "modified": x["filest"].st_mtime,
+ "created": x["filest"].st_ctime,
+ }
+ for x in folders + files
+ ]
return {
"realpath": str(path),
- "files": all[(page-1)*max:(page)*max],
+ "files": all[(page - 1) * max : (page) * max],
"total": len(all),
}
-
# Based on https://stackoverflow.com/a/46422554/13174603
def start_rdt_proxy(self, ip: str, port: int):
@@ -296,10 +302,10 @@ class Utilities:
writer.write(await reader.read(2048))
finally:
writer.close()
+
async def handle_client(local_reader: StreamReader, local_writer: StreamWriter):
try:
- remote_reader, remote_writer = await open_connection(
- ip, port)
+ remote_reader, remote_writer = await open_connection(ip, port)
pipe1 = pipe(local_reader, remote_writer)
pipe2 = pipe(remote_reader, local_writer)
await gather(pipe1, pipe2)
@@ -324,8 +330,11 @@ class Utilities:
if ip != None:
self.logger.info("Connecting to React DevTools at " + ip)
async with ClientSession() as web:
- res = await web.request("GET", "http://" + ip + ":8097", ssl=helpers.get_ssl_context())
- script = """
+ res = await web.request(
+ "GET", "http://" + ip + ":8097", ssl=helpers.get_ssl_context()
+ )
+ script = (
+ """
if (!window.deckyHasConnectedRDT) {
window.deckyHasConnectedRDT = true;
// This fixes the overlay when hovering over an element in RDT
@@ -336,7 +345,10 @@ class Utilities:
return (GamepadNavTree?.m_context?.m_controller || FocusNavController)?.m_ActiveContext?.ActiveWindow || window;
}
});
- """ + await res.text() + "\n}"
+ """
+ + await res.text()
+ + "\n}"
+ )
if res.status != 200:
self.logger.error("Failed to connect to React DevTools at " + ip)
return False
@@ -364,10 +376,24 @@ class Utilities:
self.logger.info("React DevTools disabled")
async def get_user_info(self) -> Dict[str, str]:
- return {
- "username": get_username(),
- "path": get_home_path()
- }
-
+ return {"username": get_username(), "path": get_home_path()}
+
async def get_tab_id(self, name: str):
return (await get_tab(name)).id
+
+ async def get_plugins_with_logs(self):
+ return list(self.context.plugin_loader.plugins.keys())
+
+ async def get_plugin_logs(self, plugin_name: str):
+ log_path = Path(helpers.get_homebrew_path()) / "logs" / \
+ self.context.plugin_loader.plugins[plugin_name].plugin_directory
+ return listdir(log_path)
+
+ async def get_plugin_log_text(self, plugin_name: str, log_name: str):
+ log_path = Path(helpers.get_homebrew_path()) / "logs" / \
+ self.context.plugin_loader.plugins[plugin_name].plugin_directory / log_name
+ with open(log_path, "r") as log_file:
+ return log_file.read()
+
+ async def upload_log(self, plugin_name: str, log_name: str):
+ raise RuntimeError("Not Implemented") \ No newline at end of file