summaryrefslogtreecommitdiff
path: root/backend/utilities.py
diff options
context:
space:
mode:
Diffstat (limited to 'backend/utilities.py')
-rw-r--r--backend/utilities.py74
1 files changed, 43 insertions, 31 deletions
diff --git a/backend/utilities.py b/backend/utilities.py
index 72b6f008..d45bec9b 100644
--- a/backend/utilities.py
+++ b/backend/utilities.py
@@ -1,3 +1,4 @@
+from os import stat_result
import uuid
from json.decoder import JSONDecodeError
from os.path import splitext
@@ -5,12 +6,12 @@ import re
from traceback import format_exc
from stat import FILE_ATTRIBUTE_HIDDEN # type: ignore
-from asyncio import start_server, gather, open_connection
+from asyncio import StreamReader, StreamWriter, start_server, gather, open_connection
from aiohttp import ClientSession, web
-from typing import Dict
+from typing import Callable, Coroutine, Dict, Any, List, TypedDict
from logging import getLogger
-from backend.browser import PluginInstallType
+from backend.browser import PluginInstallRequest, PluginInstallType
from backend.main import PluginManager
from injector import inject_to_tab, get_gamepadui_tab, close_old_tabs, get_tab
from pathlib import Path
@@ -18,10 +19,15 @@ from localplatform import ON_WINDOWS
import helpers
from localplatform import service_stop, service_start, get_home_path, get_username
+class FilePickerObj(TypedDict):
+ file: Path
+ filest: stat_result
+ is_dir: bool
+
class Utilities:
def __init__(self, context: PluginManager) -> None:
self.context = context
- self.util_methods: Dict[] = {
+ self.util_methods: Dict[str, Callable[..., Coroutine[Any, Any, Any]]] = {
"ping": self.ping,
"http_request": self.http_request,
"install_plugin": self.install_plugin,
@@ -54,7 +60,7 @@ class Utilities:
web.post("/methods/{method_name}", self._handle_server_method_call)
])
- async def _handle_server_method_call(self, request):
+ async def _handle_server_method_call(self, request: web.Request):
method_name = request.match_info["method_name"]
try:
args = await request.json()
@@ -70,7 +76,7 @@ class Utilities:
res["success"] = False
return web.json_response(res)
- async def install_plugin(self, artifact="", name="No name", version="dev", hash=False, install_type=PluginInstallType.INSTALL):
+ async def install_plugin(self, artifact: str="", name: str="No name", version: str="dev", hash: str="", install_type: PluginInstallType=PluginInstallType.INSTALL):
return await self.context.plugin_browser.request_plugin_install(
artifact=artifact,
name=name,
@@ -79,21 +85,21 @@ class Utilities:
install_type=install_type
)
- async def install_plugins(self, requests):
+ async def install_plugins(self, requests: List[PluginInstallRequest]):
return await self.context.plugin_browser.request_multiple_plugin_installs(
requests=requests
)
- async def confirm_plugin_install(self, request_id):
+ async def confirm_plugin_install(self, request_id: str):
return await self.context.plugin_browser.confirm_plugin_install(request_id)
- def cancel_plugin_install(self, request_id):
+ async def cancel_plugin_install(self, request_id: str):
return self.context.plugin_browser.cancel_plugin_install(request_id)
- async def uninstall_plugin(self, name):
+ async def uninstall_plugin(self, name: str):
return await self.context.plugin_browser.uninstall_plugin(name)
- async def http_request(self, method="", url="", **kwargs):
+ async def http_request(self, method: str="", url: str="", **kwargs: Any):
async with ClientSession() as web:
res = await web.request(method, url, ssl=helpers.get_ssl_context(), **kwargs)
text = await res.text()
@@ -103,12 +109,13 @@ class Utilities:
"body": text
}
- async def ping(self, **kwargs):
+ async def ping(self, **kwargs: Any):
return "pong"
- async def execute_in_tab(self, tab, run_async, code):
+ async def execute_in_tab(self, tab: str, run_async: bool, code: str):
try:
result = await inject_to_tab(tab, code, run_async)
+ assert result
if "exceptionDetails" in result["result"]:
return {
"success": False,
@@ -125,7 +132,7 @@ class Utilities:
"result": e
}
- async def inject_css_into_tab(self, tab, style):
+ async def inject_css_into_tab(self, tab: str, style: str):
try:
css_id = str(uuid.uuid4())
@@ -139,7 +146,7 @@ class Utilities:
}})()
""", False)
- if "exceptionDetails" in result["result"]:
+ if result and "exceptionDetails" in result["result"]:
return {
"success": False,
"result": result["result"]
@@ -155,7 +162,7 @@ class Utilities:
"result": e
}
- async def remove_css_from_tab(self, tab, css_id):
+ async def remove_css_from_tab(self, tab: str, css_id: str):
try:
result = await inject_to_tab(tab,
f"""
@@ -167,7 +174,7 @@ class Utilities:
}})()
""", False)
- if "exceptionDetails" in result["result"]:
+ if result and "exceptionDetails" in result["result"]:
return {
"success": False,
"result": result
@@ -182,10 +189,10 @@ class Utilities:
"result": e
}
- async def get_setting(self, key, default):
+ async def get_setting(self, key: str, default: Any):
return self.context.settings.getSetting(key, default)
- async def set_setting(self, key, value):
+ async def set_setting(self, key: str, value: Any):
return self.context.settings.setSetting(key, value)
async def allow_remote_debugging(self):
@@ -210,17 +217,18 @@ class Utilities:
if path == None:
path = get_home_path()
- path = Path(path).resolve()
+ path_obj = Path(path).resolve()
- files, folders = [], []
+ files: List[FilePickerObj] = []
+ folders: List[FilePickerObj] = []
#Resolving all files/folders in the requested directory
- for file in path.iterdir():
+ for file in path_obj.iterdir():
if file.exists():
filest = file.stat()
is_hidden = file.name.startswith('.')
if ON_WINDOWS and not is_hidden:
- is_hidden = bool(filest.st_file_attributes & FILE_ATTRIBUTE_HIDDEN)
+ is_hidden = bool(filest.st_file_attributes & FILE_ATTRIBUTE_HIDDEN) # type: ignore
if include_folders and file.is_dir():
if (is_hidden and include_hidden) or not is_hidden:
folders.append({"file": file, "filest": filest, "is_dir": True})
@@ -234,9 +242,9 @@ class Utilities:
if filter_for is not None:
try:
if re.compile(filter_for):
- files = filter(lambda file: re.search(filter_for, file.name) != None, files)
+ files = list(filter(lambda file: re.search(filter_for, file["file"].name) != None, files))
except re.error:
- files = filter(lambda file: file.name.find(filter_for) != -1, files)
+ files = list(filter(lambda file: file["file"].name.find(filter_for) != -1, files))
# Ordering logic
ord_arg = order_by.split("_")
@@ -256,6 +264,9 @@ class Utilities:
files.sort(key=lambda x: x['filest'].st_size, reverse = not rev)
# Folders has no file size, order by name instead
folders.sort(key=lambda x: x['file'].name.casefold())
+ case _:
+ files.sort(key=lambda x: x['file'].name.casefold(), reverse = rev)
+ folders.sort(key=lambda x: x['file'].name.casefold(), reverse = rev)
#Constructing the final file list, folders first
all = [{
@@ -275,14 +286,14 @@ class Utilities:
# Based on https://stackoverflow.com/a/46422554/13174603
- def start_rdt_proxy(self, ip, port):
- async def pipe(reader, writer):
+ def start_rdt_proxy(self, ip: str, port: int):
+ async def pipe(reader: StreamReader, writer: StreamWriter):
try:
while not reader.at_eof():
writer.write(await reader.read(2048))
finally:
writer.close()
- async def handle_client(local_reader, local_writer):
+ async def handle_client(local_reader: StreamReader, local_writer: StreamWriter):
try:
remote_reader, remote_writer = await open_connection(
ip, port)
@@ -298,7 +309,8 @@ class Utilities:
def stop_rdt_proxy(self):
if self.rdt_proxy_server:
self.rdt_proxy_server.close()
- self.rdt_proxy_task.cancel()
+ if self.rdt_proxy_task:
+ self.rdt_proxy_task.cancel()
async def _enable_rdt(self):
# TODO un-hardcode port
@@ -348,11 +360,11 @@ class Utilities:
await tab.evaluate_js("location.reload();", False, True, False)
self.logger.info("React DevTools disabled")
- async def get_user_info(self) -> dict:
+ async def get_user_info(self) -> Dict[str, str]:
return {
"username": get_username(),
"path": get_home_path()
}
- async def get_tab_id(self, name):
+ async def get_tab_id(self, name: str):
return (await get_tab(name)).id