summaryrefslogtreecommitdiff
path: root/backend/plugin.py
diff options
context:
space:
mode:
Diffstat (limited to 'backend/plugin.py')
-rw-r--r--backend/plugin.py139
1 files changed, 46 insertions, 93 deletions
diff --git a/backend/plugin.py b/backend/plugin.py
index dea35299..7bfe1bfe 100644
--- a/backend/plugin.py
+++ b/backend/plugin.py
@@ -1,32 +1,27 @@
import multiprocessing
from asyncio import (Lock, get_event_loop, new_event_loop,
- open_unix_connection, set_event_loop, sleep,
- start_unix_server, IncompleteReadError, LimitOverrunError)
+ set_event_loop, sleep)
from concurrent.futures import ProcessPoolExecutor
from importlib.util import module_from_spec, spec_from_file_location
from json import dumps, load, loads
from logging import getLogger
from traceback import format_exc
-from os import path, setgid, setuid, environ
+from os import path, environ
from signal import SIGINT, signal
from sys import exit, path as syspath
from time import time
+from localsocket import LocalSocket
+from localplatform import setgid, setuid, get_username, get_home_path
+from customtypes import UserType
import helpers
-from updater import Updater
-
-multiprocessing.set_start_method("fork")
-
-BUFFER_LIMIT = 2 ** 20 # 1 MiB
class PluginWrapper:
def __init__(self, file, plugin_directory, plugin_path) -> None:
self.file = file
self.plugin_path = plugin_path
self.plugin_directory = plugin_directory
- self.reader = None
- self.writer = None
- self.socket_addr = f"/tmp/plugin_socket_{time()}"
self.method_call_lock = Lock()
+ self.socket = LocalSocket(self._on_new_message)
self.version = None
@@ -35,7 +30,6 @@ class PluginWrapper:
package_json = load(open(path.join(plugin_path, plugin_directory, "package.json"), "r", encoding="utf-8"))
self.version = package_json["version"]
-
self.legacy = False
self.main_view_html = json["main_view_html"] if "main_view_html" in json else ""
self.tile_view_html = json["tile_view_html"] if "tile_view_html" in json else ""
@@ -59,13 +53,13 @@ class PluginWrapper:
set_event_loop(new_event_loop())
if self.passive:
return
- setgid(0 if "root" in self.flags else helpers.get_user_group_id())
- setuid(0 if "root" in self.flags else helpers.get_user_id())
+ setgid(UserType.ROOT if "root" in self.flags else UserType.HOST_USER)
+ setuid(UserType.ROOT if "root" in self.flags else UserType.HOST_USER)
# export a bunch of environment variables to help plugin developers
- environ["HOME"] = helpers.get_home_path("root" if "root" in self.flags else helpers.get_user())
- environ["USER"] = "root" if "root" in self.flags else helpers.get_user()
+ environ["HOME"] = get_home_path(UserType.ROOT if "root" in self.flags else UserType.HOST_USER)
+ environ["USER"] = "root" if "root" in self.flags else get_username()
environ["DECKY_VERSION"] = helpers.get_loader_version()
- environ["DECKY_USER"] = helpers.get_user()
+ environ["DECKY_USER"] = get_username()
environ["DECKY_USER_HOME"] = helpers.get_home_path()
environ["DECKY_HOME"] = helpers.get_homebrew_path()
environ["DECKY_PLUGIN_SETTINGS_DIR"] = path.join(environ["DECKY_HOME"], "settings", self.plugin_directory)
@@ -78,12 +72,10 @@ class PluginWrapper:
environ["DECKY_PLUGIN_NAME"] = self.name
environ["DECKY_PLUGIN_VERSION"] = self.version
environ["DECKY_PLUGIN_AUTHOR"] = self.author
- # append the loader's plugin path to the recognized python paths
- syspath.append(path.realpath(path.join(path.dirname(__file__), "plugin")))
+
# append the plugin's `py_modules` to the recognized python paths
syspath.append(path.join(environ["DECKY_PLUGIN_DIR"], "py_modules"))
- # append the system and user python paths
- syspath.extend(helpers.get_system_pythonpaths())
+
spec = spec_from_file_location("_", self.file)
module = module_from_spec(spec)
spec.loader.exec_module(module)
@@ -93,7 +85,7 @@ class PluginWrapper:
get_event_loop().run_until_complete(self.Plugin._migration(self.Plugin))
if hasattr(self.Plugin, "_main"):
get_event_loop().create_task(self.Plugin._main(self.Plugin))
- get_event_loop().create_task(self._setup_socket())
+ get_event_loop().create_task(self.socket.setup_server())
get_event_loop().run_forever()
except:
self.log.error("Failed to start " + self.name + "!\n" + format_exc())
@@ -111,55 +103,26 @@ class PluginWrapper:
self.log.error("Failed to unload " + self.name + "!\n" + format_exc())
exit(0)
- async def _setup_socket(self):
- self.socket = await start_unix_server(self._listen_for_method_call, path=self.socket_addr, limit=BUFFER_LIMIT)
-
- async def _listen_for_method_call(self, reader, writer):
- while True:
- line = bytearray()
- while True:
- try:
- line.extend(await reader.readuntil())
- except LimitOverrunError:
- line.extend(await reader.read(reader._limit))
- continue
- except IncompleteReadError as err:
- line.extend(err.partial)
- break
- else:
- break
- data = loads(line.decode("utf-8"))
- if "stop" in data:
- self.log.info("Calling Loader unload function.")
- await self._unload()
- get_event_loop().stop()
- while get_event_loop().is_running():
- await sleep(0)
- get_event_loop().close()
- return
- d = {"res": None, "success": True}
- try:
- d["res"] = await getattr(self.Plugin, data["method"])(self.Plugin, **data["args"])
- except Exception as e:
- d["res"] = str(e)
- d["success"] = False
- finally:
- writer.write((dumps(d, ensure_ascii=False)+"\n").encode("utf-8"))
- await writer.drain()
-
- async def _open_socket_if_not_exists(self):
- if not self.reader:
- retries = 0
- while retries < 10:
- try:
- self.reader, self.writer = await open_unix_connection(self.socket_addr, limit=BUFFER_LIMIT)
- return True
- except:
- await sleep(2)
- retries += 1
- return False
- else:
- return True
+ async def _on_new_message(self, message : str) -> str|None:
+ data = loads(message)
+
+ if "stop" in data:
+ self.log.info("Calling Loader unload function.")
+ await self._unload()
+ get_event_loop().stop()
+ while get_event_loop().is_running():
+ await sleep(0)
+ get_event_loop().close()
+ raise Exception("Closing message listener")
+
+ d = {"res": None, "success": True}
+ try:
+ d["res"] = await getattr(self.Plugin, data["method"])(self.Plugin, **data["args"])
+ except Exception as e:
+ d["res"] = str(e)
+ d["success"] = False
+ finally:
+ return dumps(d, ensure_ascii=False)
def start(self):
if self.passive:
@@ -170,34 +133,24 @@ class PluginWrapper:
def stop(self):
if self.passive:
return
+
async def _(self):
- if await self._open_socket_if_not_exists():
- self.writer.write((dumps({ "stop": True }, ensure_ascii=False)+"\n").encode("utf-8"))
- await self.writer.drain()
- self.writer.close()
+ await self.socket.write_single_line(dumps({ "stop": True }, ensure_ascii=False))
+ await self.socket.close_socket_connection()
+
get_event_loop().create_task(_(self))
async def execute_method(self, method_name, kwargs):
if self.passive:
raise RuntimeError("This plugin is passive (aka does not implement main.py)")
async with self.method_call_lock:
- if await self._open_socket_if_not_exists():
- self.writer.write(
- (dumps({ "method": method_name, "args": kwargs }, ensure_ascii=False) + "\n").encode("utf-8"))
- await self.writer.drain()
- line = bytearray()
- while True:
- try:
- line.extend(await self.reader.readuntil())
- except LimitOverrunError:
- line.extend(await self.reader.read(self.reader._limit))
- continue
- except IncompleteReadError as err:
- line.extend(err.partial)
- break
- else:
- break
- res = loads(line.decode("utf-8"))
+ reader, writer = await self.socket.get_socket_connection()
+
+ await self.socket.write_single_line(dumps({ "method": method_name, "args": kwargs }, ensure_ascii=False))
+
+ line = await self.socket.read_single_line()
+ if line != None:
+ res = loads(line)
if not res["success"]:
raise Exception(res["res"])
- return res["res"]
+ return res["res"] \ No newline at end of file