summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.vscode/settings.json6
-rw-r--r--backend/loader.py28
-rw-r--r--backend/plugin.py115
-rw-r--r--backend/plugin/binary_plugin.py48
-rw-r--r--backend/plugin/passive_plugin.py15
-rw-r--r--backend/plugin/plugin.py18
-rw-r--r--backend/plugin/python_plugin.py117
-rw-r--r--backend/plugin_protocol.py46
-rw-r--r--backend/plugin_wrapper.py34
9 files changed, 297 insertions, 130 deletions
diff --git a/.vscode/settings.json b/.vscode/settings.json
new file mode 100644
index 00000000..94ee920a
--- /dev/null
+++ b/.vscode/settings.json
@@ -0,0 +1,6 @@
+{
+ "[python]": {
+ "editor.detectIndentation": false,
+ "editor.tabSize": 4
+ }
+}
diff --git a/backend/loader.py b/backend/loader.py
index c420fafe..6d9c9c0f 100644
--- a/backend/loader.py
+++ b/backend/loader.py
@@ -1,4 +1,4 @@
-from asyncio import Queue
+from asyncio import Queue, get_event_loop, sleep, wait_for
from json.decoder import JSONDecodeError
from logging import getLogger
from os import listdir, path
@@ -16,7 +16,7 @@ except UnsupportedLibc:
from watchdog.observers.fsevents import FSEventsObserver as Observer
from injector import get_tab, inject_to_tab
-from plugin import PluginWrapper
+from plugin_wrapper import PluginWrapper
class FileChangeHandler(RegexMatchingEventHandler):
@@ -29,7 +29,7 @@ class FileChangeHandler(RegexMatchingEventHandler):
def maybe_reload(self, src_path):
plugin_dir = Path(path.relpath(src_path, self.plugin_path)).parts[0]
if exists(path.join(self.plugin_path, plugin_dir, "plugin.json")):
- self.queue.put_nowait((path.join(self.plugin_path, plugin_dir, "main.py"), plugin_dir, True))
+ self.queue.put_nowait(plugin_dir, True)
def on_created(self, event):
src_path = event.src_path
@@ -102,9 +102,9 @@ class Loader:
with open(path.join(self.plugin_path, plugin.plugin_directory, "dist/index.js"), 'r') as bundle:
return web.Response(text=bundle.read(), content_type="application/javascript")
- def import_plugin(self, file, plugin_directory, refresh=False):
+ def import_plugin(self, plugin_directory, refresh=False):
try:
- plugin = PluginWrapper(file, plugin_directory, self.plugin_path)
+ plugin = PluginWrapper(plugin_directory, self.plugin_path)
if plugin.name in self.plugins:
if not "debug" in plugin.flags and refresh:
self.logger.info(f"Plugin {plugin.name} is already loaded and has requested to not be re-loaded")
@@ -112,13 +112,12 @@ class Loader:
else:
self.plugins[plugin.name].stop()
self.plugins.pop(plugin.name, None)
- if plugin.passive:
- self.logger.info(f"Plugin {plugin.name} is passive")
- self.plugins[plugin.name] = plugin.start()
+ self.plugins[plugin.name] = plugin
+ self.loop.create_task(plugin.start())
self.logger.info(f"Loaded {plugin.name}")
self.loop.create_task(self.dispatch_plugin(plugin.name))
except Exception as e:
- self.logger.error(f"Could not load {file}. {e}")
+ self.logger.error(f"Could not load {plugin_directory}. {e}")
print_exc()
async def dispatch_plugin(self, name):
@@ -130,7 +129,7 @@ class Loader:
directories = [i for i in listdir(self.plugin_path) if path.isdir(path.join(self.plugin_path, i)) and path.isfile(path.join(self.plugin_path, i, "plugin.json"))]
for directory in directories:
self.logger.info(f"found plugin: {directory}")
- self.import_plugin(path.join(self.plugin_path, directory, "main.py"), directory)
+ self.import_plugin(directory)
async def handle_reloads(self):
while True:
@@ -143,16 +142,15 @@ class Loader:
method_name = request.match_info["method_name"]
try:
method_info = await request.json()
- args = method_info["args"]
+ method_args = method_info["args"]
except JSONDecodeError:
- args = {}
+ method_args = {}
try:
if method_name.startswith("_"):
raise RuntimeError("Tried to call private method")
- res["result"] = await plugin.execute_method(method_name, args)
- res["success"] = True
+ res = await plugin.call_method(method_name, method_args)
except Exception as e:
- res["result"] = str(e)
+ res["result"] = repr(e)
res["success"] = False
return web.json_response(res)
diff --git a/backend/plugin.py b/backend/plugin.py
deleted file mode 100644
index 11c94a05..00000000
--- a/backend/plugin.py
+++ /dev/null
@@ -1,115 +0,0 @@
-import multiprocessing
-from asyncio import (Lock, get_event_loop, new_event_loop,
- open_unix_connection, set_event_loop, sleep,
- start_unix_server)
-from concurrent.futures import ProcessPoolExecutor
-from importlib.util import module_from_spec, spec_from_file_location
-from json import dumps, load, loads
-from os import path, setuid
-from signal import SIGINT, signal
-from sys import exit
-from time import time
-
-multiprocessing.set_start_method("fork")
-
-class PluginWrapper:
- def __init__(self, file, plugin_directory, plugin_path) -> None:
- self.file = file
- self.plugin_directory = plugin_directory
- self.reader = None
- self.writer = None
- self.socket_addr = f"/tmp/plugin_socket_{time()}"
- self.method_call_lock = Lock()
-
- json = load(open(path.join(plugin_path, plugin_directory, "plugin.json"), "r"))
-
- self.legacy = False
- self.main_view_html = json["main_view_html"] if "main_view_html" in json else ""
- self.tile_view_html = json["tile_view_html"] if "tile_view_html" in json else ""
- self.legacy = self.main_view_html or self.tile_view_html
-
- self.name = json["name"]
- self.author = json["author"]
- self.flags = json["flags"]
-
- self.passive = not path.isfile(self.file)
-
- def __str__(self) -> str:
- return self.name
-
- def _init(self):
- signal(SIGINT, lambda s, f: exit(0))
-
- set_event_loop(new_event_loop())
- if self.passive:
- return
- setuid(0 if "root" in self.flags else 1000)
- spec = spec_from_file_location("_", self.file)
- module = module_from_spec(spec)
- spec.loader.exec_module(module)
- self.Plugin = module.Plugin
-
- if hasattr(self.Plugin, "_main"):
- get_event_loop().create_task(self.Plugin._main(self.Plugin))
- get_event_loop().create_task(self._setup_socket())
- get_event_loop().run_forever()
-
- async def _setup_socket(self):
- self.socket = await start_unix_server(self._listen_for_method_call, path=self.socket_addr)
-
- async def _listen_for_method_call(self, reader, writer):
- while True:
- data = loads((await reader.readline()).decode("utf-8"))
- if "stop" in data:
- get_event_loop().stop()
- while get_event_loop().is_running():
- await sleep(0)
- get_event_loop().close()
- return
- d = {"res": None, "success": True}
- try:
- d["res"] = await getattr(self.Plugin, data["method"])(self.Plugin, **data["args"])
- except Exception as e:
- d["res"] = str(e)
- d["success"] = False
- finally:
- writer.write((dumps(d)+"\n").encode("utf-8"))
- await writer.drain()
-
- async def _open_socket_if_not_exists(self):
- if not self.reader:
- while True:
- try:
- self.reader, self.writer = await open_unix_connection(self.socket_addr)
- break
- except:
- await sleep(0)
-
- def start(self):
- if self.passive:
- return self
- multiprocessing.Process(target=self._init).start()
- return self
-
- def stop(self):
- if self.passive:
- return
- async def _(self):
- await self._open_socket_if_not_exists()
- self.writer.write((dumps({"stop": True})+"\n").encode("utf-8"))
- await self.writer.drain()
- self.writer.close()
- get_event_loop().create_task(_(self))
-
- async def execute_method(self, method_name, kwargs):
- if self.passive:
- raise RuntimeError("This plugin is passive (aka does not implement main.py)")
- async with self.method_call_lock:
- await self._open_socket_if_not_exists()
- self.writer.write(
- (dumps({"method": method_name, "args": kwargs})+"\n").encode("utf-8"))
- await self.writer.drain()
- res = loads((await self.reader.readline()).decode("utf-8"))
- if not res["success"]:
- raise Exception(res["res"])
- return res["res"]
diff --git a/backend/plugin/binary_plugin.py b/backend/plugin/binary_plugin.py
new file mode 100644
index 00000000..4bd4f39b
--- /dev/null
+++ b/backend/plugin/binary_plugin.py
@@ -0,0 +1,48 @@
+import os
+from asyncio import get_event_loop, sleep, subprocess
+from posixpath import join
+from tempfile import mkdtemp
+
+from plugin_protocol import PluginProtocolServer
+
+
+class BinaryPlugin:
+ def __init__(self, plugin_directory, file_name, flags, logger) -> None:
+ self.server = PluginProtocolServer(self)
+ self.connection = None
+ self.flags = flags
+ self.logger = logger
+
+ self.plugin_directory = plugin_directory
+ self.file_name = file_name
+
+
+ async def start(self):
+ if self.connection:
+ self.connection.close()
+
+ self.unix_socket_path = BinaryPlugin.generate_socket_path()
+ self.logger.debug(f"starting unix server on {self.unix_socket_path}")
+ self.connection = await get_event_loop().create_unix_server(lambda: self.server, path=self.unix_socket_path)
+
+ env = dict(DECKY_PLUGIN_SOCKET = self.unix_socket_path)
+ self.process = await subprocess.create_subprocess_exec(join(self.plugin_directory, self.file_name), env=env)
+ get_event_loop().create_task(self.process_loop())
+
+ async def process_loop(self):
+ await self.process.wait()
+ self.logger.info("backend process was killed - restarting in 10 seconds")
+ await sleep(10)
+ await self.start()
+
+ def generate_socket_path():
+ tmp_dir = mkdtemp("decky-plugin")
+ os.chown(tmp_dir, 1000, 1000)
+ return join(tmp_dir, "socket")
+
+ # called on the server/loader process
+ async def call_method(self, method_name, method_args):
+ if self.process.returncode == None:
+ return dict(success = False, result = "Process not alive")
+
+ return await self.server.call_method(method_name, method_args)
diff --git a/backend/plugin/passive_plugin.py b/backend/plugin/passive_plugin.py
new file mode 100644
index 00000000..9b7907bb
--- /dev/null
+++ b/backend/plugin/passive_plugin.py
@@ -0,0 +1,15 @@
+class PassivePlugin:
+ def __init__(self, logger) -> None:
+ self.logger
+ pass
+
+ def call_method(self, method_name, args):
+ self.logger.debug(f"Tried to call method {method_name}, but plugin is in passive mode")
+ pass
+
+ def execute_method(self, method_name, method_args):
+ self.logger.debug(f"Tried to execute method {method_name}, but plugin is in passive mode")
+ pass
+
+ async def start(self):
+ pass# Empty stub
diff --git a/backend/plugin/plugin.py b/backend/plugin/plugin.py
new file mode 100644
index 00000000..3e359ea7
--- /dev/null
+++ b/backend/plugin/plugin.py
@@ -0,0 +1,18 @@
+from posixpath import join
+
+from genericpath import isfile
+
+from plugin.binary_plugin import BinaryPlugin
+from plugin.passive_plugin import PassivePlugin
+from plugin.python_plugin import PythonPlugin
+
+
+def get_plugin_backend(spec, plugin_directory, flags, logger):
+ if spec == None and isfile(join(plugin_directory, "main.py")):
+ return PythonPlugin(plugin_directory, "main.py", flags, logger)
+ elif spec["type"] == "python":
+ return PythonPlugin(plugin_directory, spec["file"], flags, logger)
+ elif spec["type"] == "binary":
+ return BinaryPlugin(plugin_directory, spec["file"], flags, logger)
+ else:
+ return PassivePlugin(logger)
diff --git a/backend/plugin/python_plugin.py b/backend/plugin/python_plugin.py
new file mode 100644
index 00000000..c2833088
--- /dev/null
+++ b/backend/plugin/python_plugin.py
@@ -0,0 +1,117 @@
+import json
+import multiprocessing
+import os
+import uuid
+from asyncio import (Protocol, get_event_loop, new_event_loop, set_event_loop,
+ sleep)
+from importlib.util import module_from_spec, spec_from_file_location
+from posixpath import join
+from signal import SIGINT, signal
+from tempfile import mkdtemp
+
+from plugin_protocol import PluginProtocolServer
+
+multiprocessing.set_start_method("fork")
+
+# only useable by the python backend
+class PluginProtocolClient(Protocol):
+ def __init__(self, backend, logger) -> None:
+ super().__init__()
+ self.backend = backend
+ self.logger = logger
+
+ def connection_made(self, transport):
+ self.transport = transport
+
+ def data_received(self, data: bytes) -> None:
+ message = json.loads(data.decode("utf-8"))
+ message_id = str(uuid.UUID(message["id"]))
+ message_type = message["type"]
+ payload = message["payload"]
+
+ self.logger.debug(f"received {message_id} {message_type} {payload}")
+ if message_type == "method_call":
+ get_event_loop().create_task(self.handle_method_call(message_id, payload["name"], payload["args"]))
+
+ async def handle_method_call(self, message_id, method_name, method_args):
+ try:
+ result = await self.backend.execute_method(method_name, method_args)
+ self.respond_message(message_id, "method_response", dict(success = True, result = result))
+ except AttributeError as e:
+ self.respond_message(message_id, "method_response", dict(success = False, result = f"plugin does not expose a method called {method_name}"))
+ except Exception as e:
+ self.respond_message(message_id, "method_response", dict(success = False, result = str(e)))
+
+ def respond_message(self, message_id, message_type, payload):
+ self.logger.debug(f"sending {message_id} {message_type} {payload}")
+ message = json.dumps(dict(id = str(message_id), type = message_type, payload = payload))
+ self.transport.write(message.encode('utf-8'))
+
+
+class PythonPlugin:
+ def __init__(self, plugin_directory, file_name, flags, logger) -> None:
+ self.client = PluginProtocolClient(self, logger)
+ self.server = PluginProtocolServer(self)
+ self.connection = None
+
+ self.plugin_directory = plugin_directory
+ self.file_name = file_name
+ self.flags = flags
+ self.logger = logger
+
+ def _init(self):
+ self.logger.debug(f"child process Initializing")
+ signal(SIGINT, lambda s, f: exit(0))
+
+ set_event_loop(new_event_loop())
+ # TODO: both processes can access the socket
+ # setuid(0 if "root" in self.flags else 1000)
+ spec = spec_from_file_location("_", join(self.plugin_directory, self.file_name))
+ module = module_from_spec(spec)
+ spec.loader.exec_module(module)
+ self.Plugin = module.Plugin
+
+ if hasattr(self.Plugin, "_main"):
+ self.logger.debug("Found _main, calling it")
+ get_event_loop().create_task(self.Plugin._main(self.Plugin))
+
+ get_event_loop().create_task(self._connect())
+ get_event_loop().run_forever()
+
+ async def _connect(self):
+ self.logger.debug(f"connecting to unix server on {self.unix_socket_path}")
+ await get_event_loop().create_unix_connection(lambda: self.client, path=self.unix_socket_path)
+
+ async def start(self):
+ if self.connection:
+ self.connection.close()
+
+ self.unix_socket_path = PythonPlugin.generate_socket_path()
+ self.logger.debug(f"starting unix server on {self.unix_socket_path}")
+ self.connection = await get_event_loop().create_unix_server(lambda: self.server, path=self.unix_socket_path)
+
+ self.process = multiprocessing.Process(target=self._init)
+ self.process.start()
+ get_event_loop().create_task(self.process_loop())
+
+ async def process_loop(self):
+ await get_event_loop().run_in_executor(None, self.process.join)
+ self.logger.info("backend process was killed - restarting in 10 seconds")
+ await sleep(10)
+ await self.start()
+
+ # called on the server/loader process
+ async def call_method(self, method_name, method_args):
+ if not self.process.is_alive():
+ return dict(success = False, result = "Process not alive")
+
+ return await self.server.call_method(method_name, method_args)
+
+ # called on the client
+ def execute_method(self, method_name, method_args):
+ return getattr(self.Plugin, method_name)(self.Plugin, **method_args)
+
+ def generate_socket_path():
+ tmp_dir = mkdtemp("decky-plugin")
+ os.chown(tmp_dir, 1000, 1000)
+ return join(tmp_dir, "socket")
diff --git a/backend/plugin_protocol.py b/backend/plugin_protocol.py
new file mode 100644
index 00000000..445a1e3a
--- /dev/null
+++ b/backend/plugin_protocol.py
@@ -0,0 +1,46 @@
+import json
+import uuid
+from asyncio import Protocol, TimeoutError, get_event_loop, wait_for
+from gc import callbacks
+from subprocess import call
+
+
+class PluginProtocolServer(Protocol):
+ def __init__(self, backend) -> None:
+ super().__init__()
+ self.backend = backend
+ self.callbacks = {}
+
+ def connection_made(self, transport):
+ self.transport = transport
+
+ def data_received(self, data: bytes) -> None:
+ message = json.loads(data.decode("utf-8"))
+ message_id = str(uuid.UUID(message["id"]))
+ message_type = message["type"]
+ payload = message["payload"]
+
+ if message_type == "method_response":
+ get_event_loop().create_task(self.handle_method_response(message_id, payload["success"], payload["result"]))
+
+ async def handle_method_response(self, message_id, success, result):
+ if message_id in self.callbacks:
+ self.callbacks[message_id].set_result(dict(success = success, result = result))
+ del self.callbacks[message_id]
+
+ async def send_message(self, type, payload):
+ id = str(uuid.uuid4())
+ callback = get_event_loop().create_future()
+ message = json.dumps(dict(id = id, type = type, payload = payload))
+
+ self.callbacks[id] = callback
+ self.transport.write(message.encode('utf-8'))
+
+ try:
+ return await wait_for(callback, 10)
+ except TimeoutError as e:
+ del self.callbacks[id]
+ raise e
+
+ def call_method(self, method_name, method_args):
+ return self.send_message("method_call", dict(name = method_name, args = method_args))
diff --git a/backend/plugin_wrapper.py b/backend/plugin_wrapper.py
new file mode 100644
index 00000000..d41454df
--- /dev/null
+++ b/backend/plugin_wrapper.py
@@ -0,0 +1,34 @@
+import multiprocessing
+from json import load
+from logging import getLogger
+from os import path
+
+from plugin.plugin import get_plugin_backend
+
+
+class PluginWrapper:
+ def __init__(self, plugin_relative_directory, plugin_path) -> None:
+ self.plugin_directory = path.join(plugin_path, plugin_relative_directory)
+
+ json = load(open(path.join(self.plugin_directory, "plugin.json"), "r"))
+
+ self.legacy = False
+ self.main_view_html = json["main_view_html"] if "main_view_html" in json else ""
+ self.tile_view_html = json["tile_view_html"] if "tile_view_html" in json else ""
+ self.legacy = self.main_view_html or self.tile_view_html
+
+ self.name = json["name"]
+ self.author = json["author"]
+ self.flags = json["flags"]
+ self.logger = getLogger(f"{self.name}")
+
+ self.backend = get_plugin_backend(json.get("backend"), self.plugin_directory, self.flags, self.logger)
+
+ def call_method(self, method_name, args):
+ return self.backend.call_method(method_name, args)
+
+ def start(self):
+ return self.backend.start()
+
+ def __str__(self) -> str:
+ return self.name