From 0a12fe6102da33977548ba0c277bd4fe34e262ab Mon Sep 17 00:00:00 2001 From: Jonas Dellinger Date: Thu, 16 Jun 2022 18:33:43 +0200 Subject: First draft of backend independent plugins --- .vscode/settings.json | 6 ++ backend/loader.py | 28 +++++----- backend/plugin.py | 115 -------------------------------------- backend/plugin/binary_plugin.py | 48 ++++++++++++++++ backend/plugin/passive_plugin.py | 15 +++++ backend/plugin/plugin.py | 18 ++++++ backend/plugin/python_plugin.py | 117 +++++++++++++++++++++++++++++++++++++++ backend/plugin_protocol.py | 46 +++++++++++++++ backend/plugin_wrapper.py | 34 ++++++++++++ 9 files changed, 297 insertions(+), 130 deletions(-) create mode 100644 .vscode/settings.json delete mode 100644 backend/plugin.py create mode 100644 backend/plugin/binary_plugin.py create mode 100644 backend/plugin/passive_plugin.py create mode 100644 backend/plugin/plugin.py create mode 100644 backend/plugin/python_plugin.py create mode 100644 backend/plugin_protocol.py create mode 100644 backend/plugin_wrapper.py diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..94ee920a --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,6 @@ +{ + "[python]": { + "editor.detectIndentation": false, + "editor.tabSize": 4 + } +} diff --git a/backend/loader.py b/backend/loader.py index c420fafe..6d9c9c0f 100644 --- a/backend/loader.py +++ b/backend/loader.py @@ -1,4 +1,4 @@ -from asyncio import Queue +from asyncio import Queue, get_event_loop, sleep, wait_for from json.decoder import JSONDecodeError from logging import getLogger from os import listdir, path @@ -16,7 +16,7 @@ except UnsupportedLibc: from watchdog.observers.fsevents import FSEventsObserver as Observer from injector import get_tab, inject_to_tab -from plugin import PluginWrapper +from plugin_wrapper import PluginWrapper class FileChangeHandler(RegexMatchingEventHandler): @@ -29,7 +29,7 @@ class FileChangeHandler(RegexMatchingEventHandler): def maybe_reload(self, src_path): plugin_dir = Path(path.relpath(src_path, self.plugin_path)).parts[0] if exists(path.join(self.plugin_path, plugin_dir, "plugin.json")): - self.queue.put_nowait((path.join(self.plugin_path, plugin_dir, "main.py"), plugin_dir, True)) + self.queue.put_nowait(plugin_dir, True) def on_created(self, event): src_path = event.src_path @@ -102,9 +102,9 @@ class Loader: with open(path.join(self.plugin_path, plugin.plugin_directory, "dist/index.js"), 'r') as bundle: return web.Response(text=bundle.read(), content_type="application/javascript") - def import_plugin(self, file, plugin_directory, refresh=False): + def import_plugin(self, plugin_directory, refresh=False): try: - plugin = PluginWrapper(file, plugin_directory, self.plugin_path) + plugin = PluginWrapper(plugin_directory, self.plugin_path) if plugin.name in self.plugins: if not "debug" in plugin.flags and refresh: self.logger.info(f"Plugin {plugin.name} is already loaded and has requested to not be re-loaded") @@ -112,13 +112,12 @@ class Loader: else: self.plugins[plugin.name].stop() self.plugins.pop(plugin.name, None) - if plugin.passive: - self.logger.info(f"Plugin {plugin.name} is passive") - self.plugins[plugin.name] = plugin.start() + self.plugins[plugin.name] = plugin + self.loop.create_task(plugin.start()) self.logger.info(f"Loaded {plugin.name}") self.loop.create_task(self.dispatch_plugin(plugin.name)) except Exception as e: - self.logger.error(f"Could not load {file}. {e}") + self.logger.error(f"Could not load {plugin_directory}. {e}") print_exc() async def dispatch_plugin(self, name): @@ -130,7 +129,7 @@ class Loader: directories = [i for i in listdir(self.plugin_path) if path.isdir(path.join(self.plugin_path, i)) and path.isfile(path.join(self.plugin_path, i, "plugin.json"))] for directory in directories: self.logger.info(f"found plugin: {directory}") - self.import_plugin(path.join(self.plugin_path, directory, "main.py"), directory) + self.import_plugin(directory) async def handle_reloads(self): while True: @@ -143,16 +142,15 @@ class Loader: method_name = request.match_info["method_name"] try: method_info = await request.json() - args = method_info["args"] + method_args = method_info["args"] except JSONDecodeError: - args = {} + method_args = {} try: if method_name.startswith("_"): raise RuntimeError("Tried to call private method") - res["result"] = await plugin.execute_method(method_name, args) - res["success"] = True + res = await plugin.call_method(method_name, method_args) except Exception as e: - res["result"] = str(e) + res["result"] = repr(e) res["success"] = False return web.json_response(res) diff --git a/backend/plugin.py b/backend/plugin.py deleted file mode 100644 index 11c94a05..00000000 --- a/backend/plugin.py +++ /dev/null @@ -1,115 +0,0 @@ -import multiprocessing -from asyncio import (Lock, get_event_loop, new_event_loop, - open_unix_connection, set_event_loop, sleep, - start_unix_server) -from concurrent.futures import ProcessPoolExecutor -from importlib.util import module_from_spec, spec_from_file_location -from json import dumps, load, loads -from os import path, setuid -from signal import SIGINT, signal -from sys import exit -from time import time - -multiprocessing.set_start_method("fork") - -class PluginWrapper: - def __init__(self, file, plugin_directory, plugin_path) -> None: - self.file = file - self.plugin_directory = plugin_directory - self.reader = None - self.writer = None - self.socket_addr = f"/tmp/plugin_socket_{time()}" - self.method_call_lock = Lock() - - json = load(open(path.join(plugin_path, plugin_directory, "plugin.json"), "r")) - - self.legacy = False - self.main_view_html = json["main_view_html"] if "main_view_html" in json else "" - self.tile_view_html = json["tile_view_html"] if "tile_view_html" in json else "" - self.legacy = self.main_view_html or self.tile_view_html - - self.name = json["name"] - self.author = json["author"] - self.flags = json["flags"] - - self.passive = not path.isfile(self.file) - - def __str__(self) -> str: - return self.name - - def _init(self): - signal(SIGINT, lambda s, f: exit(0)) - - set_event_loop(new_event_loop()) - if self.passive: - return - setuid(0 if "root" in self.flags else 1000) - spec = spec_from_file_location("_", self.file) - module = module_from_spec(spec) - spec.loader.exec_module(module) - self.Plugin = module.Plugin - - if hasattr(self.Plugin, "_main"): - get_event_loop().create_task(self.Plugin._main(self.Plugin)) - get_event_loop().create_task(self._setup_socket()) - get_event_loop().run_forever() - - async def _setup_socket(self): - self.socket = await start_unix_server(self._listen_for_method_call, path=self.socket_addr) - - async def _listen_for_method_call(self, reader, writer): - while True: - data = loads((await reader.readline()).decode("utf-8")) - if "stop" in data: - get_event_loop().stop() - while get_event_loop().is_running(): - await sleep(0) - get_event_loop().close() - return - d = {"res": None, "success": True} - try: - d["res"] = await getattr(self.Plugin, data["method"])(self.Plugin, **data["args"]) - except Exception as e: - d["res"] = str(e) - d["success"] = False - finally: - writer.write((dumps(d)+"\n").encode("utf-8")) - await writer.drain() - - async def _open_socket_if_not_exists(self): - if not self.reader: - while True: - try: - self.reader, self.writer = await open_unix_connection(self.socket_addr) - break - except: - await sleep(0) - - def start(self): - if self.passive: - return self - multiprocessing.Process(target=self._init).start() - return self - - def stop(self): - if self.passive: - return - async def _(self): - await self._open_socket_if_not_exists() - self.writer.write((dumps({"stop": True})+"\n").encode("utf-8")) - await self.writer.drain() - self.writer.close() - get_event_loop().create_task(_(self)) - - async def execute_method(self, method_name, kwargs): - if self.passive: - raise RuntimeError("This plugin is passive (aka does not implement main.py)") - async with self.method_call_lock: - await self._open_socket_if_not_exists() - self.writer.write( - (dumps({"method": method_name, "args": kwargs})+"\n").encode("utf-8")) - await self.writer.drain() - res = loads((await self.reader.readline()).decode("utf-8")) - if not res["success"]: - raise Exception(res["res"]) - return res["res"] diff --git a/backend/plugin/binary_plugin.py b/backend/plugin/binary_plugin.py new file mode 100644 index 00000000..4bd4f39b --- /dev/null +++ b/backend/plugin/binary_plugin.py @@ -0,0 +1,48 @@ +import os +from asyncio import get_event_loop, sleep, subprocess +from posixpath import join +from tempfile import mkdtemp + +from plugin_protocol import PluginProtocolServer + + +class BinaryPlugin: + def __init__(self, plugin_directory, file_name, flags, logger) -> None: + self.server = PluginProtocolServer(self) + self.connection = None + self.flags = flags + self.logger = logger + + self.plugin_directory = plugin_directory + self.file_name = file_name + + + async def start(self): + if self.connection: + self.connection.close() + + self.unix_socket_path = BinaryPlugin.generate_socket_path() + self.logger.debug(f"starting unix server on {self.unix_socket_path}") + self.connection = await get_event_loop().create_unix_server(lambda: self.server, path=self.unix_socket_path) + + env = dict(DECKY_PLUGIN_SOCKET = self.unix_socket_path) + self.process = await subprocess.create_subprocess_exec(join(self.plugin_directory, self.file_name), env=env) + get_event_loop().create_task(self.process_loop()) + + async def process_loop(self): + await self.process.wait() + self.logger.info("backend process was killed - restarting in 10 seconds") + await sleep(10) + await self.start() + + def generate_socket_path(): + tmp_dir = mkdtemp("decky-plugin") + os.chown(tmp_dir, 1000, 1000) + return join(tmp_dir, "socket") + + # called on the server/loader process + async def call_method(self, method_name, method_args): + if self.process.returncode == None: + return dict(success = False, result = "Process not alive") + + return await self.server.call_method(method_name, method_args) diff --git a/backend/plugin/passive_plugin.py b/backend/plugin/passive_plugin.py new file mode 100644 index 00000000..9b7907bb --- /dev/null +++ b/backend/plugin/passive_plugin.py @@ -0,0 +1,15 @@ +class PassivePlugin: + def __init__(self, logger) -> None: + self.logger + pass + + def call_method(self, method_name, args): + self.logger.debug(f"Tried to call method {method_name}, but plugin is in passive mode") + pass + + def execute_method(self, method_name, method_args): + self.logger.debug(f"Tried to execute method {method_name}, but plugin is in passive mode") + pass + + async def start(self): + pass# Empty stub diff --git a/backend/plugin/plugin.py b/backend/plugin/plugin.py new file mode 100644 index 00000000..3e359ea7 --- /dev/null +++ b/backend/plugin/plugin.py @@ -0,0 +1,18 @@ +from posixpath import join + +from genericpath import isfile + +from plugin.binary_plugin import BinaryPlugin +from plugin.passive_plugin import PassivePlugin +from plugin.python_plugin import PythonPlugin + + +def get_plugin_backend(spec, plugin_directory, flags, logger): + if spec == None and isfile(join(plugin_directory, "main.py")): + return PythonPlugin(plugin_directory, "main.py", flags, logger) + elif spec["type"] == "python": + return PythonPlugin(plugin_directory, spec["file"], flags, logger) + elif spec["type"] == "binary": + return BinaryPlugin(plugin_directory, spec["file"], flags, logger) + else: + return PassivePlugin(logger) diff --git a/backend/plugin/python_plugin.py b/backend/plugin/python_plugin.py new file mode 100644 index 00000000..c2833088 --- /dev/null +++ b/backend/plugin/python_plugin.py @@ -0,0 +1,117 @@ +import json +import multiprocessing +import os +import uuid +from asyncio import (Protocol, get_event_loop, new_event_loop, set_event_loop, + sleep) +from importlib.util import module_from_spec, spec_from_file_location +from posixpath import join +from signal import SIGINT, signal +from tempfile import mkdtemp + +from plugin_protocol import PluginProtocolServer + +multiprocessing.set_start_method("fork") + +# only useable by the python backend +class PluginProtocolClient(Protocol): + def __init__(self, backend, logger) -> None: + super().__init__() + self.backend = backend + self.logger = logger + + def connection_made(self, transport): + self.transport = transport + + def data_received(self, data: bytes) -> None: + message = json.loads(data.decode("utf-8")) + message_id = str(uuid.UUID(message["id"])) + message_type = message["type"] + payload = message["payload"] + + self.logger.debug(f"received {message_id} {message_type} {payload}") + if message_type == "method_call": + get_event_loop().create_task(self.handle_method_call(message_id, payload["name"], payload["args"])) + + async def handle_method_call(self, message_id, method_name, method_args): + try: + result = await self.backend.execute_method(method_name, method_args) + self.respond_message(message_id, "method_response", dict(success = True, result = result)) + except AttributeError as e: + self.respond_message(message_id, "method_response", dict(success = False, result = f"plugin does not expose a method called {method_name}")) + except Exception as e: + self.respond_message(message_id, "method_response", dict(success = False, result = str(e))) + + def respond_message(self, message_id, message_type, payload): + self.logger.debug(f"sending {message_id} {message_type} {payload}") + message = json.dumps(dict(id = str(message_id), type = message_type, payload = payload)) + self.transport.write(message.encode('utf-8')) + + +class PythonPlugin: + def __init__(self, plugin_directory, file_name, flags, logger) -> None: + self.client = PluginProtocolClient(self, logger) + self.server = PluginProtocolServer(self) + self.connection = None + + self.plugin_directory = plugin_directory + self.file_name = file_name + self.flags = flags + self.logger = logger + + def _init(self): + self.logger.debug(f"child process Initializing") + signal(SIGINT, lambda s, f: exit(0)) + + set_event_loop(new_event_loop()) + # TODO: both processes can access the socket + # setuid(0 if "root" in self.flags else 1000) + spec = spec_from_file_location("_", join(self.plugin_directory, self.file_name)) + module = module_from_spec(spec) + spec.loader.exec_module(module) + self.Plugin = module.Plugin + + if hasattr(self.Plugin, "_main"): + self.logger.debug("Found _main, calling it") + get_event_loop().create_task(self.Plugin._main(self.Plugin)) + + get_event_loop().create_task(self._connect()) + get_event_loop().run_forever() + + async def _connect(self): + self.logger.debug(f"connecting to unix server on {self.unix_socket_path}") + await get_event_loop().create_unix_connection(lambda: self.client, path=self.unix_socket_path) + + async def start(self): + if self.connection: + self.connection.close() + + self.unix_socket_path = PythonPlugin.generate_socket_path() + self.logger.debug(f"starting unix server on {self.unix_socket_path}") + self.connection = await get_event_loop().create_unix_server(lambda: self.server, path=self.unix_socket_path) + + self.process = multiprocessing.Process(target=self._init) + self.process.start() + get_event_loop().create_task(self.process_loop()) + + async def process_loop(self): + await get_event_loop().run_in_executor(None, self.process.join) + self.logger.info("backend process was killed - restarting in 10 seconds") + await sleep(10) + await self.start() + + # called on the server/loader process + async def call_method(self, method_name, method_args): + if not self.process.is_alive(): + return dict(success = False, result = "Process not alive") + + return await self.server.call_method(method_name, method_args) + + # called on the client + def execute_method(self, method_name, method_args): + return getattr(self.Plugin, method_name)(self.Plugin, **method_args) + + def generate_socket_path(): + tmp_dir = mkdtemp("decky-plugin") + os.chown(tmp_dir, 1000, 1000) + return join(tmp_dir, "socket") diff --git a/backend/plugin_protocol.py b/backend/plugin_protocol.py new file mode 100644 index 00000000..445a1e3a --- /dev/null +++ b/backend/plugin_protocol.py @@ -0,0 +1,46 @@ +import json +import uuid +from asyncio import Protocol, TimeoutError, get_event_loop, wait_for +from gc import callbacks +from subprocess import call + + +class PluginProtocolServer(Protocol): + def __init__(self, backend) -> None: + super().__init__() + self.backend = backend + self.callbacks = {} + + def connection_made(self, transport): + self.transport = transport + + def data_received(self, data: bytes) -> None: + message = json.loads(data.decode("utf-8")) + message_id = str(uuid.UUID(message["id"])) + message_type = message["type"] + payload = message["payload"] + + if message_type == "method_response": + get_event_loop().create_task(self.handle_method_response(message_id, payload["success"], payload["result"])) + + async def handle_method_response(self, message_id, success, result): + if message_id in self.callbacks: + self.callbacks[message_id].set_result(dict(success = success, result = result)) + del self.callbacks[message_id] + + async def send_message(self, type, payload): + id = str(uuid.uuid4()) + callback = get_event_loop().create_future() + message = json.dumps(dict(id = id, type = type, payload = payload)) + + self.callbacks[id] = callback + self.transport.write(message.encode('utf-8')) + + try: + return await wait_for(callback, 10) + except TimeoutError as e: + del self.callbacks[id] + raise e + + def call_method(self, method_name, method_args): + return self.send_message("method_call", dict(name = method_name, args = method_args)) diff --git a/backend/plugin_wrapper.py b/backend/plugin_wrapper.py new file mode 100644 index 00000000..d41454df --- /dev/null +++ b/backend/plugin_wrapper.py @@ -0,0 +1,34 @@ +import multiprocessing +from json import load +from logging import getLogger +from os import path + +from plugin.plugin import get_plugin_backend + + +class PluginWrapper: + def __init__(self, plugin_relative_directory, plugin_path) -> None: + self.plugin_directory = path.join(plugin_path, plugin_relative_directory) + + json = load(open(path.join(self.plugin_directory, "plugin.json"), "r")) + + self.legacy = False + self.main_view_html = json["main_view_html"] if "main_view_html" in json else "" + self.tile_view_html = json["tile_view_html"] if "tile_view_html" in json else "" + self.legacy = self.main_view_html or self.tile_view_html + + self.name = json["name"] + self.author = json["author"] + self.flags = json["flags"] + self.logger = getLogger(f"{self.name}") + + self.backend = get_plugin_backend(json.get("backend"), self.plugin_directory, self.flags, self.logger) + + def call_method(self, method_name, args): + return self.backend.call_method(method_name, args) + + def start(self): + return self.backend.start() + + def __str__(self) -> str: + return self.name -- cgit v1.2.3