summaryrefslogtreecommitdiff
path: root/backend/loader.py
diff options
context:
space:
mode:
Diffstat (limited to 'backend/loader.py')
-rw-r--r--backend/loader.py69
1 files changed, 38 insertions, 31 deletions
diff --git a/backend/loader.py b/backend/loader.py
index d07b1c08..89bb0457 100644
--- a/backend/loader.py
+++ b/backend/loader.py
@@ -1,34 +1,40 @@
-from asyncio import Queue, sleep
+from asyncio import AbstractEventLoop, Queue, sleep
from json.decoder import JSONDecodeError
from logging import getLogger
from os import listdir, path
from pathlib import Path
from traceback import print_exc
+from typing import Any, Tuple
from aiohttp import web
from os.path import exists
-from watchdog.events import RegexMatchingEventHandler
-from watchdog.observers import Observer
+from watchdog.events import RegexMatchingEventHandler, DirCreatedEvent, DirModifiedEvent, FileCreatedEvent, FileModifiedEvent # type: ignore
+from watchdog.observers import Observer # type: ignore
-from injector import get_tab, get_gamepadui_tab
-from plugin import PluginWrapper
+from backend.main import PluginManager # type: ignore
+
+from .injector import get_tab, get_gamepadui_tab
+from .plugin import PluginWrapper
+
+Plugins = dict[str, PluginWrapper]
+ReloadQueue = Queue[Tuple[str, str, bool | None] | Tuple[str, str]]
class FileChangeHandler(RegexMatchingEventHandler):
- def __init__(self, queue, plugin_path) -> None:
- super().__init__(regexes=[r'^.*?dist\/index\.js$', r'^.*?main\.py$'])
+ def __init__(self, queue: ReloadQueue, plugin_path: str) -> None:
+ super().__init__(regexes=[r'^.*?dist\/index\.js$', r'^.*?main\.py$']) # type: ignore
self.logger = getLogger("file-watcher")
self.plugin_path = plugin_path
self.queue = queue
self.disabled = True
- def maybe_reload(self, src_path):
+ def maybe_reload(self, src_path: str):
if self.disabled:
return
plugin_dir = Path(path.relpath(src_path, self.plugin_path)).parts[0]
if exists(path.join(self.plugin_path, plugin_dir, "plugin.json")):
self.queue.put_nowait((path.join(self.plugin_path, plugin_dir, "main.py"), plugin_dir, True))
- def on_created(self, event):
+ def on_created(self, event: DirCreatedEvent | FileCreatedEvent):
src_path = event.src_path
if "__pycache__" in src_path:
return
@@ -42,7 +48,7 @@ class FileChangeHandler(RegexMatchingEventHandler):
self.logger.debug(f"file created: {src_path}")
self.maybe_reload(src_path)
- def on_modified(self, event):
+ def on_modified(self, event: DirModifiedEvent | FileModifiedEvent):
src_path = event.src_path
if "__pycache__" in src_path:
return
@@ -57,25 +63,25 @@ class FileChangeHandler(RegexMatchingEventHandler):
self.maybe_reload(src_path)
class Loader:
- def __init__(self, server_instance, plugin_path, loop, live_reload=False) -> None:
+ def __init__(self, server_instance: PluginManager, plugin_path: str, loop: AbstractEventLoop, live_reload: bool =False) -> None:
self.loop = loop
self.logger = getLogger("Loader")
self.plugin_path = plugin_path
self.logger.info(f"plugin_path: {self.plugin_path}")
- self.plugins : dict[str, PluginWrapper] = {}
+ self.plugins: Plugins = {}
self.watcher = None
self.live_reload = live_reload
- self.reload_queue = Queue()
+ self.reload_queue: ReloadQueue = Queue()
self.loop.create_task(self.handle_reloads())
if live_reload:
self.observer = Observer()
self.watcher = FileChangeHandler(self.reload_queue, plugin_path)
- self.observer.schedule(self.watcher, self.plugin_path, recursive=True)
+ self.observer.schedule(self.watcher, self.plugin_path, recursive=True) # type: ignore
self.observer.start()
self.loop.create_task(self.enable_reload_wait())
- server_instance.add_routes([
+ server_instance.web_app.add_routes([
web.get("/frontend/{path:.*}", self.handle_frontend_assets),
web.get("/locales/{path:.*}", self.handle_frontend_locales),
web.get("/plugins", self.get_plugins),
@@ -93,15 +99,16 @@ class Loader:
async def enable_reload_wait(self):
if self.live_reload:
await sleep(10)
- self.logger.info("Hot reload enabled")
- self.watcher.disabled = False
+ if self.watcher:
+ self.logger.info("Hot reload enabled")
+ self.watcher.disabled = False
- async def handle_frontend_assets(self, request):
+ async def handle_frontend_assets(self, request: web.Request):
file = path.join(path.dirname(__file__), "static", request.match_info["path"])
return web.FileResponse(file, headers={"Cache-Control": "no-cache"})
- async def handle_frontend_locales(self, request):
+ async def handle_frontend_locales(self, request: web.Request):
req_lang = request.match_info["path"]
file = path.join(path.dirname(__file__), "locales", req_lang)
if exists(file):
@@ -110,23 +117,23 @@ class Loader:
self.logger.info(f"Language {req_lang} not available, returning an empty dictionary")
return web.json_response(data={}, headers={"Cache-Control": "no-cache"})
- async def get_plugins(self, request):
+ async def get_plugins(self, request: web.Request):
plugins = list(self.plugins.values())
return web.json_response([{"name": str(i) if not i.legacy else "$LEGACY_"+str(i), "version": i.version} for i in plugins])
- def handle_plugin_frontend_assets(self, request):
+ async def handle_plugin_frontend_assets(self, request: web.Request):
plugin = self.plugins[request.match_info["plugin_name"]]
file = path.join(self.plugin_path, plugin.plugin_directory, "dist/assets", request.match_info["path"])
return web.FileResponse(file, headers={"Cache-Control": "no-cache"})
- def handle_frontend_bundle(self, request):
+ async def handle_frontend_bundle(self, request: web.Request):
plugin = self.plugins[request.match_info["plugin_name"]]
with open(path.join(self.plugin_path, plugin.plugin_directory, "dist/index.js"), "r", encoding="utf-8") as bundle:
return web.Response(text=bundle.read(), content_type="application/javascript")
- def import_plugin(self, file, plugin_directory, refresh=False, batch=False):
+ def import_plugin(self, file: str, plugin_directory: str, refresh: bool | None = False, batch: bool | None = False):
try:
plugin = PluginWrapper(file, plugin_directory, self.plugin_path)
if plugin.name in self.plugins:
@@ -146,7 +153,7 @@ class Loader:
self.logger.error(f"Could not load {file}. {e}")
print_exc()
- async def dispatch_plugin(self, name, version):
+ async def dispatch_plugin(self, name: str, version: str | None):
gpui_tab = await get_gamepadui_tab()
await gpui_tab.evaluate_js(f"window.importDeckyPlugin('{name}', '{version}')")
@@ -161,15 +168,15 @@ class Loader:
async def handle_reloads(self):
while True:
args = await self.reload_queue.get()
- self.import_plugin(*args)
+ self.import_plugin(*args) # type: ignore
- async def handle_plugin_method_call(self, request):
+ async def handle_plugin_method_call(self, request: web.Request):
res = {}
plugin = self.plugins[request.match_info["plugin_name"]]
method_name = request.match_info["method_name"]
try:
method_info = await request.json()
- args = method_info["args"]
+ args: Any = method_info["args"]
except JSONDecodeError:
args = {}
try:
@@ -189,7 +196,7 @@ class Loader:
can introduce it more smoothly and give people the chance to sample the new features even
without plugin support. They will be removed once legacy plugins are no longer relevant.
"""
- async def load_plugin_main_view(self, request):
+ async def load_plugin_main_view(self, request: web.Request):
plugin = self.plugins[request.match_info["name"]]
with open(path.join(self.plugin_path, plugin.plugin_directory, plugin.main_view_html), "r", encoding="utf-8") as template:
template_data = template.read()
@@ -201,7 +208,7 @@ class Loader:
"""
return web.Response(text=ret, content_type="text/html")
- async def handle_sub_route(self, request):
+ async def handle_sub_route(self, request: web.Request):
plugin = self.plugins[request.match_info["name"]]
route_path = request.match_info["path"]
self.logger.info(path)
@@ -212,14 +219,14 @@ class Loader:
return web.Response(text=ret)
- async def get_steam_resource(self, request):
+ async def get_steam_resource(self, request: web.Request):
tab = await get_tab("SP")
try:
return web.Response(text=await tab.get_steam_resource(f"https://steamloopback.host/{request.match_info['path']}"), content_type="text/html")
except Exception as e:
return web.Response(text=str(e), status=400)
- async def handle_backend_reload_request(self, request):
+ async def handle_backend_reload_request(self, request: web.Request):
plugin_name : str = request.match_info["plugin_name"]
plugin = self.plugins[plugin_name]