diff options
Diffstat (limited to 'backend/plugin.py')
| -rw-r--r-- | backend/plugin.py | 139 |
1 files changed, 46 insertions, 93 deletions
diff --git a/backend/plugin.py b/backend/plugin.py index dea35299..7bfe1bfe 100644 --- a/backend/plugin.py +++ b/backend/plugin.py @@ -1,32 +1,27 @@ import multiprocessing from asyncio import (Lock, get_event_loop, new_event_loop, - open_unix_connection, set_event_loop, sleep, - start_unix_server, IncompleteReadError, LimitOverrunError) + set_event_loop, sleep) from concurrent.futures import ProcessPoolExecutor from importlib.util import module_from_spec, spec_from_file_location from json import dumps, load, loads from logging import getLogger from traceback import format_exc -from os import path, setgid, setuid, environ +from os import path, environ from signal import SIGINT, signal from sys import exit, path as syspath from time import time +from localsocket import LocalSocket +from localplatform import setgid, setuid, get_username, get_home_path +from customtypes import UserType import helpers -from updater import Updater - -multiprocessing.set_start_method("fork") - -BUFFER_LIMIT = 2 ** 20 # 1 MiB class PluginWrapper: def __init__(self, file, plugin_directory, plugin_path) -> None: self.file = file self.plugin_path = plugin_path self.plugin_directory = plugin_directory - self.reader = None - self.writer = None - self.socket_addr = f"/tmp/plugin_socket_{time()}" self.method_call_lock = Lock() + self.socket = LocalSocket(self._on_new_message) self.version = None @@ -35,7 +30,6 @@ class PluginWrapper: package_json = load(open(path.join(plugin_path, plugin_directory, "package.json"), "r", encoding="utf-8")) self.version = package_json["version"] - self.legacy = False self.main_view_html = json["main_view_html"] if "main_view_html" in json else "" self.tile_view_html = json["tile_view_html"] if "tile_view_html" in json else "" @@ -59,13 +53,13 @@ class PluginWrapper: set_event_loop(new_event_loop()) if self.passive: return - setgid(0 if "root" in self.flags else helpers.get_user_group_id()) - setuid(0 if "root" in self.flags else helpers.get_user_id()) + setgid(UserType.ROOT if "root" in self.flags else UserType.HOST_USER) + setuid(UserType.ROOT if "root" in self.flags else UserType.HOST_USER) # export a bunch of environment variables to help plugin developers - environ["HOME"] = helpers.get_home_path("root" if "root" in self.flags else helpers.get_user()) - environ["USER"] = "root" if "root" in self.flags else helpers.get_user() + environ["HOME"] = get_home_path(UserType.ROOT if "root" in self.flags else UserType.HOST_USER) + environ["USER"] = "root" if "root" in self.flags else get_username() environ["DECKY_VERSION"] = helpers.get_loader_version() - environ["DECKY_USER"] = helpers.get_user() + environ["DECKY_USER"] = get_username() environ["DECKY_USER_HOME"] = helpers.get_home_path() environ["DECKY_HOME"] = helpers.get_homebrew_path() environ["DECKY_PLUGIN_SETTINGS_DIR"] = path.join(environ["DECKY_HOME"], "settings", self.plugin_directory) @@ -78,12 +72,10 @@ class PluginWrapper: environ["DECKY_PLUGIN_NAME"] = self.name environ["DECKY_PLUGIN_VERSION"] = self.version environ["DECKY_PLUGIN_AUTHOR"] = self.author - # append the loader's plugin path to the recognized python paths - syspath.append(path.realpath(path.join(path.dirname(__file__), "plugin"))) + # append the plugin's `py_modules` to the recognized python paths syspath.append(path.join(environ["DECKY_PLUGIN_DIR"], "py_modules")) - # append the system and user python paths - syspath.extend(helpers.get_system_pythonpaths()) + spec = spec_from_file_location("_", self.file) module = module_from_spec(spec) spec.loader.exec_module(module) @@ -93,7 +85,7 @@ class PluginWrapper: get_event_loop().run_until_complete(self.Plugin._migration(self.Plugin)) if hasattr(self.Plugin, "_main"): get_event_loop().create_task(self.Plugin._main(self.Plugin)) - get_event_loop().create_task(self._setup_socket()) + get_event_loop().create_task(self.socket.setup_server()) get_event_loop().run_forever() except: self.log.error("Failed to start " + self.name + "!\n" + format_exc()) @@ -111,55 +103,26 @@ class PluginWrapper: self.log.error("Failed to unload " + self.name + "!\n" + format_exc()) exit(0) - async def _setup_socket(self): - self.socket = await start_unix_server(self._listen_for_method_call, path=self.socket_addr, limit=BUFFER_LIMIT) - - async def _listen_for_method_call(self, reader, writer): - while True: - line = bytearray() - while True: - try: - line.extend(await reader.readuntil()) - except LimitOverrunError: - line.extend(await reader.read(reader._limit)) - continue - except IncompleteReadError as err: - line.extend(err.partial) - break - else: - break - data = loads(line.decode("utf-8")) - if "stop" in data: - self.log.info("Calling Loader unload function.") - await self._unload() - get_event_loop().stop() - while get_event_loop().is_running(): - await sleep(0) - get_event_loop().close() - return - d = {"res": None, "success": True} - try: - d["res"] = await getattr(self.Plugin, data["method"])(self.Plugin, **data["args"]) - except Exception as e: - d["res"] = str(e) - d["success"] = False - finally: - writer.write((dumps(d, ensure_ascii=False)+"\n").encode("utf-8")) - await writer.drain() - - async def _open_socket_if_not_exists(self): - if not self.reader: - retries = 0 - while retries < 10: - try: - self.reader, self.writer = await open_unix_connection(self.socket_addr, limit=BUFFER_LIMIT) - return True - except: - await sleep(2) - retries += 1 - return False - else: - return True + async def _on_new_message(self, message : str) -> str|None: + data = loads(message) + + if "stop" in data: + self.log.info("Calling Loader unload function.") + await self._unload() + get_event_loop().stop() + while get_event_loop().is_running(): + await sleep(0) + get_event_loop().close() + raise Exception("Closing message listener") + + d = {"res": None, "success": True} + try: + d["res"] = await getattr(self.Plugin, data["method"])(self.Plugin, **data["args"]) + except Exception as e: + d["res"] = str(e) + d["success"] = False + finally: + return dumps(d, ensure_ascii=False) def start(self): if self.passive: @@ -170,34 +133,24 @@ class PluginWrapper: def stop(self): if self.passive: return + async def _(self): - if await self._open_socket_if_not_exists(): - self.writer.write((dumps({ "stop": True }, ensure_ascii=False)+"\n").encode("utf-8")) - await self.writer.drain() - self.writer.close() + await self.socket.write_single_line(dumps({ "stop": True }, ensure_ascii=False)) + await self.socket.close_socket_connection() + get_event_loop().create_task(_(self)) async def execute_method(self, method_name, kwargs): if self.passive: raise RuntimeError("This plugin is passive (aka does not implement main.py)") async with self.method_call_lock: - if await self._open_socket_if_not_exists(): - self.writer.write( - (dumps({ "method": method_name, "args": kwargs }, ensure_ascii=False) + "\n").encode("utf-8")) - await self.writer.drain() - line = bytearray() - while True: - try: - line.extend(await self.reader.readuntil()) - except LimitOverrunError: - line.extend(await self.reader.read(self.reader._limit)) - continue - except IncompleteReadError as err: - line.extend(err.partial) - break - else: - break - res = loads(line.decode("utf-8")) + reader, writer = await self.socket.get_socket_connection() + + await self.socket.write_single_line(dumps({ "method": method_name, "args": kwargs }, ensure_ascii=False)) + + line = await self.socket.read_single_line() + if line != None: + res = loads(line) if not res["success"]: raise Exception(res["res"]) - return res["res"] + return res["res"]
\ No newline at end of file |
