summaryrefslogtreecommitdiff
path: root/backend/loader.py
diff options
context:
space:
mode:
Diffstat (limited to 'backend/loader.py')
-rw-r--r--backend/loader.py128
1 files changed, 95 insertions, 33 deletions
diff --git a/backend/loader.py b/backend/loader.py
index 48a66a8d..3ed3b303 100644
--- a/backend/loader.py
+++ b/backend/loader.py
@@ -21,7 +21,7 @@ from plugin import PluginWrapper
class FileChangeHandler(RegexMatchingEventHandler):
def __init__(self, queue, plugin_path) -> None:
- super().__init__(regexes=[r'^.*?dist\/index\.js$', r'^.*?main\.py$'])
+ super().__init__(regexes=[r"^.*?dist\/index\.js$", r"^.*?main\.py$"])
self.logger = getLogger("file-watcher")
self.plugin_path = plugin_path
self.queue = queue
@@ -32,7 +32,9 @@ class FileChangeHandler(RegexMatchingEventHandler):
return
plugin_dir = Path(path.relpath(src_path, self.plugin_path)).parts[0]
if exists(path.join(self.plugin_path, plugin_dir, "plugin.json")):
- self.queue.put_nowait((path.join(self.plugin_path, plugin_dir, "main.py"), plugin_dir, True))
+ self.queue.put_nowait(
+ (path.join(self.plugin_path, plugin_dir, "main.py"), plugin_dir, True)
+ )
def on_created(self, event):
src_path = event.src_path
@@ -62,6 +64,7 @@ class FileChangeHandler(RegexMatchingEventHandler):
self.logger.debug(f"file modified: {src_path}")
self.maybe_reload(src_path)
+
class Loader:
def __init__(self, server_instance, plugin_path, loop, live_reload=False) -> None:
self.loop = loop
@@ -81,18 +84,30 @@ class Loader:
self.loop.create_task(self.handle_reloads())
self.loop.create_task(self.enable_reload_wait())
- server_instance.add_routes([
- web.get("/frontend/{path:.*}", self.handle_frontend_assets),
- web.get("/plugins", self.get_plugins),
- web.get("/plugins/{plugin_name}/frontend_bundle", self.handle_frontend_bundle),
- web.post("/plugins/{plugin_name}/methods/{method_name}", self.handle_plugin_method_call),
- web.get("/plugins/{plugin_name}/assets/{path:.*}", self.handle_plugin_frontend_assets),
-
- # The following is legacy plugin code.
- web.get("/plugins/load_main/{name}", self.load_plugin_main_view),
- web.get("/plugins/plugin_resource/{name}/{path:.+}", self.handle_sub_route),
- web.get("/steam_resource/{path:.+}", self.get_steam_resource)
- ])
+ server_instance.add_routes(
+ [
+ web.get("/frontend/{path:.*}", self.handle_frontend_assets),
+ web.get("/plugins", self.get_plugins),
+ web.get(
+ "/plugins/{plugin_name}/frontend_bundle",
+ self.handle_frontend_bundle,
+ ),
+ web.post(
+ "/plugins/{plugin_name}/methods/{method_name}",
+ self.handle_plugin_method_call,
+ ),
+ web.get(
+ "/plugins/{plugin_name}/assets/{path:.*}",
+ self.handle_plugin_frontend_assets,
+ ),
+ # The following is legacy plugin code.
+ web.get("/plugins/load_main/{name}", self.load_plugin_main_view),
+ web.get(
+ "/plugins/plugin_resource/{name}/{path:.+}", self.handle_sub_route
+ ),
+ web.get("/steam_resource/{path:.+}", self.get_steam_resource),
+ ]
+ )
async def enable_reload_wait(self):
if self.live_reload:
@@ -107,36 +122,63 @@ class Loader:
async def get_plugins(self, request):
plugins = list(self.plugins.values())
- return web.json_response([{"name": str(i) if not i.legacy else "$LEGACY_"+str(i), "version": i.version} for i in plugins])
+ return web.json_response(
+ [
+ {
+ "name": str(i) if not i.legacy else "$LEGACY_" + str(i),
+ "version": i.version,
+ }
+ for i in plugins
+ ]
+ )
def handle_plugin_frontend_assets(self, request):
plugin = self.plugins[request.match_info["plugin_name"]]
- file = path.join(self.plugin_path, plugin.plugin_directory, "dist/assets", request.match_info["path"])
+ file = path.join(
+ self.plugin_path,
+ plugin.plugin_directory,
+ "dist/assets",
+ request.match_info["path"],
+ )
return web.FileResponse(file, headers={"Cache-Control": "no-cache"})
def handle_frontend_bundle(self, request):
plugin = self.plugins[request.match_info["plugin_name"]]
- with open(path.join(self.plugin_path, plugin.plugin_directory, "dist/index.js"), "r", encoding="utf-8") as bundle:
- return web.Response(text=bundle.read(), content_type="application/javascript")
+ with open(
+ path.join(self.plugin_path, plugin.plugin_directory, "dist/index.js"),
+ "r",
+ encoding="utf-8",
+ ) as bundle:
+ return web.Response(
+ text=bundle.read(), content_type="application/javascript"
+ )
def import_plugin(self, file, plugin_directory, refresh=False, batch=False):
try:
plugin = PluginWrapper(file, plugin_directory, self.plugin_path)
if plugin.name in self.plugins:
- if not "debug" in plugin.flags and refresh:
- self.logger.info(f"Plugin {plugin.name} is already loaded and has requested to not be re-loaded")
- return
- else:
- self.plugins[plugin.name].stop()
- self.plugins.pop(plugin.name, None)
+ if "debug" not in plugin.flags and refresh:
+ self.logger.info(
+ f"Plugin {plugin.name} is already loaded and has requested to"
+ " not be re-loaded"
+ )
+ return
+ else:
+ self.plugins[plugin.name].stop()
+ self.plugins.pop(plugin.name, None)
if plugin.passive:
self.logger.info(f"Plugin {plugin.name} is passive")
self.plugins[plugin.name] = plugin.start()
self.logger.info(f"Loaded {plugin.name}")
if not batch:
- self.loop.create_task(self.dispatch_plugin(plugin.name if not plugin.legacy else "$LEGACY_" + plugin.name, plugin.version))
+ self.loop.create_task(
+ self.dispatch_plugin(
+ plugin.name if not plugin.legacy else "$LEGACY_" + plugin.name,
+ plugin.version,
+ )
+ )
except Exception as e:
self.logger.error(f"Could not load {file}. {e}")
print_exc()
@@ -148,10 +190,20 @@ class Loader:
def import_plugins(self):
self.logger.info(f"import plugins from {self.plugin_path}")
- directories = [i for i in listdir(self.plugin_path) if path.isdir(path.join(self.plugin_path, i)) and path.isfile(path.join(self.plugin_path, i, "plugin.json"))]
+ directories = [
+ i
+ for i in listdir(self.plugin_path)
+ if path.isdir(path.join(self.plugin_path, i))
+ and path.isfile(path.join(self.plugin_path, i, "plugin.json"))
+ ]
for directory in directories:
self.logger.info(f"found plugin: {directory}")
- self.import_plugin(path.join(self.plugin_path, directory, "main.py"), directory, False, True)
+ self.import_plugin(
+ path.join(self.plugin_path, directory, "main.py"),
+ directory,
+ False,
+ True,
+ )
async def handle_reloads(self):
while True:
@@ -168,10 +220,10 @@ class Loader:
except JSONDecodeError:
args = {}
try:
- if method_name.startswith("_"):
- raise RuntimeError("Tried to call private method")
- res["result"] = await plugin.execute_method(method_name, args)
- res["success"] = True
+ if method_name.startswith("_"):
+ raise RuntimeError("Tried to call private method")
+ res["result"] = await plugin.execute_method(method_name, args)
+ res["success"] = True
except Exception as e:
res["result"] = str(e)
res["success"] = False
@@ -184,9 +236,14 @@ class Loader:
can introduce it more smoothly and give people the chance to sample the new features even
without plugin support. They will be removed once legacy plugins are no longer relevant.
"""
+
async def load_plugin_main_view(self, request):
plugin = self.plugins[request.match_info["name"]]
- with open(path.join(self.plugin_path, plugin.plugin_directory, plugin.main_view_html), "r", encoding="utf-8") as template:
+ with open(
+ path.join(self.plugin_path, plugin.plugin_directory, plugin.main_view_html),
+ "r",
+ encoding="utf-8",
+ ) as template:
template_data = template.read()
ret = f"""
<script src="/legacy/library.js"></script>
@@ -210,6 +267,11 @@ class Loader:
async def get_steam_resource(self, request):
tab = await get_tab("SP")
try:
- return web.Response(text=await tab.get_steam_resource(f"https://steamloopback.host/{request.match_info['path']}"), content_type="text/html")
+ return web.Response(
+ text=await tab.get_steam_resource(
+ f"https://steamloopback.host/{request.match_info['path']}"
+ ),
+ content_type="text/html",
+ )
except Exception as e:
return web.Response(text=str(e), status=400)