diff options
Diffstat (limited to 'backend/plugin.py')
| -rw-r--r-- | backend/plugin.py | 36 |
1 files changed, 31 insertions, 5 deletions
diff --git a/backend/plugin.py b/backend/plugin.py index 04ba285d..b16d40d8 100644 --- a/backend/plugin.py +++ b/backend/plugin.py @@ -1,7 +1,7 @@ import multiprocessing from asyncio import (Lock, get_event_loop, new_event_loop, open_unix_connection, set_event_loop, sleep, - start_unix_server) + start_unix_server, IncompleteReadError, LimitOverrunError) from concurrent.futures import ProcessPoolExecutor from importlib.util import module_from_spec, spec_from_file_location from json import dumps, load, loads @@ -12,6 +12,8 @@ from time import time multiprocessing.set_start_method("fork") +BUFFER_LIMIT = 2 ** 20 # 1 MiB + class PluginWrapper: def __init__(self, file, plugin_directory, plugin_path) -> None: self.file = file @@ -62,11 +64,23 @@ class PluginWrapper: get_event_loop().run_forever() async def _setup_socket(self): - self.socket = await start_unix_server(self._listen_for_method_call, path=self.socket_addr) + self.socket = await start_unix_server(self._listen_for_method_call, path=self.socket_addr, limit=BUFFER_LIMIT) async def _listen_for_method_call(self, reader, writer): while True: - data = loads((await reader.readline()).decode("utf-8")) + line = bytearray() + while True: + try: + line.extend(await reader.readuntil()) + except LimitOverrunError: + line.extend(await reader.read(reader._limit)) + continue + except IncompleteReadError as err: + line.extend(err.partial) + break + else: + break + data = loads(line.decode("utf-8")) if "stop" in data: get_event_loop().stop() while get_event_loop().is_running(): @@ -88,7 +102,7 @@ class PluginWrapper: retries = 0 while retries < 10: try: - self.reader, self.writer = await open_unix_connection(self.socket_addr) + self.reader, self.writer = await open_unix_connection(self.socket_addr, limit=BUFFER_LIMIT) return True except: await sleep(2) @@ -121,7 +135,19 @@ class PluginWrapper: self.writer.write( (dumps({"method": method_name, "args": kwargs})+"\n").encode("utf-8")) await self.writer.drain() - res = loads((await self.reader.readline()).decode("utf-8")) + line = bytearray() + while True: + try: + line.extend(await self.reader.readuntil()) + except LimitOverrunError: + line.extend(await self.reader.read(self.reader._limit)) + continue + except IncompleteReadError as err: + line.extend(err.partial) + break + else: + break + res = loads(line.decode("utf-8")) if not res["success"]: raise Exception(res["res"]) return res["res"] |
