summaryrefslogtreecommitdiff
path: root/plugin_loader/loader.py
diff options
context:
space:
mode:
authorJonas Dellinger <jonas@dellinger.dev>2022-05-13 19:14:47 +0200
committerJonas Dellinger <jonas@dellinger.dev>2022-05-13 19:14:47 +0200
commit74438a31458af8bddd08d90eacc6d63677bab844 (patch)
treea7bfc044941f65c7f9971c5386c463eac31be768 /plugin_loader/loader.py
parent945db5de4788feefebc845817752472419051640 (diff)
downloaddecky-loader-74438a31458af8bddd08d90eacc6d63677bab844.tar.gz
decky-loader-74438a31458af8bddd08d90eacc6d63677bab844.zip
Work on react frontend loader
Diffstat (limited to 'plugin_loader/loader.py')
-rw-r--r--plugin_loader/loader.py191
1 files changed, 0 insertions, 191 deletions
diff --git a/plugin_loader/loader.py b/plugin_loader/loader.py
deleted file mode 100644
index fea5f149..00000000
--- a/plugin_loader/loader.py
+++ /dev/null
@@ -1,191 +0,0 @@
-from aiohttp import web
-from aiohttp_jinja2 import template
-from watchdog.observers.polling import PollingObserver as Observer
-from watchdog.events import FileSystemEventHandler
-from asyncio import Queue
-from os import path, listdir
-from logging import getLogger
-from time import time
-
-from injector import get_tabs, get_tab
-from plugin import PluginWrapper
-from traceback import print_exc
-
-class FileChangeHandler(FileSystemEventHandler):
- def __init__(self, queue, plugin_path) -> None:
- super().__init__()
- self.logger = getLogger("file-watcher")
- self.plugin_path = plugin_path
- self.queue = queue
-
- def on_created(self, event):
- src_path = event.src_path
- if "__pycache__" in src_path:
- return
-
- # check to make sure this isn't a directory
- if path.isdir(src_path):
- return
-
- # get the directory name of the plugin so that we can find its "main.py" and reload it; the
- # file that changed is not necessarily the one that needs to be reloaded
- self.logger.debug(f"file created: {src_path}")
- rel_path = path.relpath(src_path, path.commonprefix([self.plugin_path, src_path]))
- plugin_dir = path.split(rel_path)[0]
- main_file_path = path.join(self.plugin_path, plugin_dir, "main.py")
- self.queue.put_nowait((main_file_path, plugin_dir, True))
-
- def on_modified(self, event):
- src_path = event.src_path
- if "__pycache__" in src_path:
- return
-
- # check to make sure this isn't a directory
- if path.isdir(src_path):
- return
-
- # get the directory name of the plugin so that we can find its "main.py" and reload it; the
- # file that changed is not necessarily the one that needs to be reloaded
- self.logger.debug(f"file modified: {src_path}")
- plugin_dir = path.split(path.relpath(src_path, path.commonprefix([self.plugin_path, src_path])))[0]
- self.queue.put_nowait((path.join(self.plugin_path, plugin_dir, "main.py"), plugin_dir, True))
-
-class Loader:
- def __init__(self, server_instance, plugin_path, loop, live_reload=False) -> None:
- self.loop = loop
- self.logger = getLogger("Loader")
- self.plugin_path = plugin_path
- self.logger.info(f"plugin_path: {self.plugin_path}")
- self.plugins = {}
- self.callsigns = {}
- self.callsign_matches = {}
- self.import_plugins()
-
- if live_reload:
- self.reload_queue = Queue()
- self.observer = Observer()
- self.observer.schedule(FileChangeHandler(self.reload_queue, plugin_path), self.plugin_path, recursive=True)
- self.observer.start()
- self.loop.create_task(self.handle_reloads())
-
- server_instance.add_routes([
- web.get("/plugins/iframe", self.plugin_iframe_route),
- web.get("/plugins/load_main/{name}", self.load_plugin_main_view),
- web.get("/plugins/plugin_resource/{name}/{path:.+}", self.handle_sub_route),
- web.get("/plugins/load_tile/{name}", self.load_plugin_tile_view),
- web.get("/steam_resource/{path:.+}", self.get_steam_resource)
- ])
-
- def import_plugin(self, file, plugin_directory, refresh=False):
- try:
- plugin = PluginWrapper(file, plugin_directory, self.plugin_path)
- if plugin.name in self.plugins:
- if not "debug" in plugin.flags and refresh:
- self.logger.info(f"Plugin {plugin.name} is already loaded and has requested to not be re-loaded")
- return
- else:
- self.plugins[plugin.name].stop()
- self.plugins.pop(plugin.name, None)
- self.callsigns.pop(self.callsign_matches[file], None)
- if plugin.passive:
- self.logger.info(f"Plugin {plugin.name} is passive")
- callsign = str(time())
- plugin.callsign = callsign
- self.plugins[plugin.name] = plugin.start()
- self.callsigns[callsign] = plugin
- self.callsign_matches[file] = callsign
- self.logger.info(f"Loaded {plugin.name}")
- except Exception as e:
- self.logger.error(f"Could not load {file}. {e}")
- print_exc()
- finally:
- if refresh:
- self.loop.create_task(self.refresh_iframe())
-
- def import_plugins(self):
- self.logger.info(f"import plugins from {self.plugin_path}")
-
- directories = [i for i in listdir(self.plugin_path) if path.isdir(path.join(self.plugin_path, i)) and path.isfile(path.join(self.plugin_path, i, "plugin.json"))]
- for directory in directories:
- self.logger.info(f"found plugin: {directory}")
- self.import_plugin(path.join(self.plugin_path, directory, "main.py"), directory)
-
- async def handle_reloads(self):
- while True:
- args = await self.reload_queue.get()
- self.import_plugin(*args)
-
- async def handle_plugin_method_call(self, callsign, method_name, **kwargs):
- if method_name.startswith("_"):
- raise RuntimeError("Tried to call private method")
- return await self.callsigns[callsign].execute_method(method_name, kwargs)
-
- async def get_steam_resource(self, request):
- tab = (await get_tabs())[0]
- try:
- return web.Response(text=await tab.get_steam_resource(f"https://steamloopback.host/{request.match_info['path']}"), content_type="text/html")
- except Exception as e:
- return web.Response(text=str(e), status=400)
-
- async def load_plugin_main_view(self, request):
- plugin = self.callsigns[request.match_info["name"]]
-
- # open up the main template
- with open(path.join(self.plugin_path, plugin.plugin_directory, plugin.main_view_html), 'r') as template:
- template_data = template.read()
- # setup the main script, plugin, and pull in the template
- ret = f"""
- <script src="/static/library.js"></script>
- <script>const plugin_name = '{plugin.callsign}' </script>
- <base href="http://127.0.0.1:1337/plugins/plugin_resource/{plugin.callsign}/">
- {template_data}
- """
- return web.Response(text=ret, content_type="text/html")
-
- async def handle_sub_route(self, request):
- plugin = self.callsigns[request.match_info["name"]]
- route_path = request.match_info["path"]
- self.logger.info(path)
-
- ret = ""
-
- file_path = path.join(self.plugin_path, plugin.plugin_directory, route_path)
- with open(file_path, 'r') as resource_data:
- ret = resource_data.read()
-
- return web.Response(text=ret)
-
- async def load_plugin_tile_view(self, request):
- plugin = self.callsigns[request.match_info["name"]]
-
- inner_content = ""
-
- # open up the tile template (if we have one defined)
- if hasattr(plugin, "tile_view_html"):
- with open(path.join(self.plugin_path, plugin.plugin_directory, plugin.tile_view_html), 'r') as template:
- template_data = template.read()
- inner_content = template_data
-
- # setup the default template
- ret = f"""
- <html style="height: fit-content;">
- <head>
- <link rel="stylesheet" href="/static/styles.css">
- <script src="/static/library.js"></script>
- <script>const plugin_name = '{plugin.callsign}';</script>
- </head>
- <body style="height: fit-content; display: block;">
- {inner_content}
- </body>
- <html>
- """
- return web.Response(text=ret, content_type="text/html")
-
- @template('plugin_view.html')
- async def plugin_iframe_route(self, request):
- return {"plugins": self.plugins.values()}
-
- async def refresh_iframe(self):
- tab = await get_tab("QuickAccess")
- await tab.open_websocket()
- return await tab.evaluate_js("reloadIframe()", False)