from aiohttp import web
from aiohttp_jinja2 import template
from watchdog.observers.polling import PollingObserver as Observer
from watchdog.events import FileSystemEventHandler
from asyncio import Queue
from os import path, listdir
from logging import getLogger
from time import time
from injector import get_tabs, get_tab
from plugin import PluginWrapper
from traceback import print_exc
class FileChangeHandler(FileSystemEventHandler):
def __init__(self, queue, plugin_path) -> None:
super().__init__()
self.logger = getLogger("file-watcher")
self.plugin_path = plugin_path
self.queue = queue
def on_created(self, event):
src_path = event.src_path
if "__pycache__" in src_path:
return
# check to make sure this isn't a directory
if path.isdir(src_path):
return
# get the directory name of the plugin so that we can find its "main.py" and reload it; the
# file that changed is not necessarily the one that needs to be reloaded
self.logger.debug(f"file created: {src_path}")
rel_path = path.relpath(src_path, path.commonprefix([self.plugin_path, src_path]))
plugin_dir = path.split(rel_path)[0]
main_file_path = path.join(self.plugin_path, plugin_dir, "main.py")
self.queue.put_nowait((main_file_path, plugin_dir, True))
def on_modified(self, event):
src_path = event.src_path
if "__pycache__" in src_path:
return
# check to make sure this isn't a directory
if path.isdir(src_path):
return
# get the directory name of the plugin so that we can find its "main.py" and reload it; the
# file that changed is not necessarily the one that needs to be reloaded
self.logger.debug(f"file modified: {src_path}")
plugin_dir = path.split(path.relpath(src_path, path.commonprefix([self.plugin_path, src_path])))[0]
self.queue.put_nowait((path.join(self.plugin_path, plugin_dir, "main.py"), plugin_dir, True))
class Loader:
def __init__(self, server_instance, plugin_path, loop, live_reload=False) -> None:
self.loop = loop
self.logger = getLogger("Loader")
self.plugin_path = plugin_path
self.logger.info(f"plugin_path: {self.plugin_path}")
self.plugins = {}
self.callsigns = {}
self.callsign_matches = {}
self.import_plugins()
if live_reload:
self.reload_queue = Queue()
self.observer = Observer()
self.observer.schedule(FileChangeHandler(self.reload_queue, plugin_path), self.plugin_path, recursive=True)
self.observer.start()
self.loop.create_task(self.handle_reloads())
server_instance.add_routes([
web.get("/plugins/iframe", self.plugin_iframe_route),
web.get("/plugins/load_main/{name}", self.load_plugin_main_view),
web.get("/plugins/plugin_resource/{name}/{path:.+}", self.handle_sub_route),
web.get("/plugins/load_tile/{name}", self.load_plugin_tile_view),
web.get("/steam_resource/{path:.+}", self.get_steam_resource)
])
def import_plugin(self, file, plugin_directory, refresh=False):
try:
plugin = PluginWrapper(file, plugin_directory, self.plugin_path)
if plugin.name in self.plugins:
if not "debug" in plugin.flags and refresh:
self.logger.info(f"Plugin {plugin.name} is already loaded and has requested to not be re-loaded")
return
else:
self.plugins[plugin.name].stop()
self.plugins.pop(plugin.name, None)
self.callsigns.pop(self.callsign_matches[file], None)
if plugin.passive:
self.logger.info(f"Plugin {plugin.name} is passive")
callsign = str(time())
plugin.callsign = callsign
self.plugins[plugin.name] = plugin.start()
self.callsigns[callsign] = plugin
self.callsign_matches[file] = callsign
self.logger.info(f"Loaded {plugin.name}")
except Exception as e:
self.logger.error(f"Could not load {file}. {e}")
print_exc()
finally:
if refresh:
self.loop.create_task(self.refresh_iframe())
def import_plugins(self):
self.logger.info(f"import plugins from {self.plugin_path}")
directories = [i for i in listdir(self.plugin_path) if path.isdir(path.join(self.plugin_path, i)) and path.isfile(path.join(self.plugin_path, i, "plugin.json"))]
for directory in directories:
self.logger.info(f"found plugin: {directory}")
self.import_plugin(path.join(self.plugin_path, directory, "main.py"), directory)
async def handle_reloads(self):
while True:
args = await self.reload_queue.get()
self.import_plugin(*args)
async def handle_plugin_method_call(self, callsign, method_name, **kwargs):
if method_name.startswith("_"):
raise RuntimeError("Tried to call private method")
return await self.callsigns[callsign].execute_method(method_name, kwargs)
async def get_steam_resource(self, request):
tab = (await get_tabs())[0]
try:
return web.Response(text=await tab.get_steam_resource(f"https://steamloopback.host/{request.match_info['path']}"), content_type="text/html")
except Exception as e:
return web.Response(text=str(e), status=400)
async def load_plugin_main_view(self, request):
plugin = self.callsigns[request.match_info["name"]]
# open up the main template
with open(path.join(self.plugin_path, plugin.plugin_directory, plugin.main_view_html), 'r') as template:
template_data = template.read()
# setup the main script, plugin, and pull in the template
ret = f"""