diff options
Diffstat (limited to 'backend/helpers.py')
| -rw-r--r-- | backend/helpers.py | 70 |
1 files changed, 57 insertions, 13 deletions
diff --git a/backend/helpers.py b/backend/helpers.py index 7cab512b..d817f0b9 100644 --- a/backend/helpers.py +++ b/backend/helpers.py @@ -6,8 +6,6 @@ import subprocess import uuid import os import sys -from subprocess import check_output -from time import sleep from hashlib import sha256 from io import BytesIO @@ -24,22 +22,38 @@ ssl_ctx = ssl.create_default_context(cafile=certifi.where()) assets_regex = re.compile("^/plugins/.*/assets/.*") frontend_regex = re.compile("^/frontend/.*") + def get_ssl_context(): return ssl_ctx + def get_csrf_token(): return csrf_token + @middleware async def csrf_middleware(request, handler): - if str(request.method) == "OPTIONS" or request.headers.get('Authentication') == csrf_token or str(request.rel_url) == "/auth/token" or str(request.rel_url).startswith("/plugins/load_main/") or str(request.rel_url).startswith("/static/") or str(request.rel_url).startswith("/legacy/") or str(request.rel_url).startswith("/steam_resource/") or str(request.rel_url).startswith("/frontend/") or assets_regex.match(str(request.rel_url)) or frontend_regex.match(str(request.rel_url)): + if ( + str(request.method) == "OPTIONS" + or request.headers.get("Authentication") == csrf_token + or str(request.rel_url) == "/auth/token" + or str(request.rel_url).startswith("/plugins/load_main/") + or str(request.rel_url).startswith("/static/") + or str(request.rel_url).startswith("/legacy/") + or str(request.rel_url).startswith("/steam_resource/") + or str(request.rel_url).startswith("/frontend/") + or assets_regex.match(str(request.rel_url)) + or frontend_regex.match(str(request.rel_url)) + ): return await handler(request) - return Response(text='Forbidden', status='403') + return Response(text="Forbidden", status="403") + # Deprecated def set_user(): pass + # Get the user id hosting the plugin loader def get_user_id() -> int: proc_path = os.path.realpath(sys.argv[0]) @@ -47,60 +61,75 @@ def get_user_id() -> int: for pw in pws: if proc_path.startswith(os.path.realpath(pw.pw_dir)): return pw.pw_uid - raise PermissionError("The plugin loader does not seem to be hosted by any known user.") + raise PermissionError( + "The plugin loader does not seem to be hosted by any known user." + ) + # Get the user hosting the plugin loader def get_user() -> str: return pwd.getpwuid(get_user_id()).pw_name + # Get the effective user id of the running process def get_effective_user_id() -> int: return os.geteuid() + # Get the effective user of the running process def get_effective_user() -> str: return pwd.getpwuid(get_effective_user_id()).pw_name + # Get the effective user group id of the running process def get_effective_user_group_id() -> int: return os.getegid() + # Get the effective user group of the running process def get_effective_user_group() -> str: return grp.getgrgid(get_effective_user_group_id()).gr_name + # Get the user owner of the given file path. def get_user_owner(file_path) -> str: return pwd.getpwuid(os.stat(file_path).st_uid).pw_name + # Get the user group of the given file path. def get_user_group(file_path) -> str: return grp.getgrgid(os.stat(file_path).st_gid).gr_name + # Deprecated def set_user_group() -> str: return get_user_group() + # Get the group id of the user hosting the plugin loader def get_user_group_id() -> int: return pwd.getpwuid(get_user_id()).pw_gid + # Get the group of the user hosting the plugin loader def get_user_group() -> str: return grp.getgrgid(get_user_group_id()).gr_name + # Get the default home path unless a user is specified -def get_home_path(username = None) -> str: - if username == None: +def get_home_path(username=None) -> str: + if username is None: username = get_user() return pwd.getpwnam(username).pw_dir + # Get the default homebrew path unless a home_path is specified -def get_homebrew_path(home_path = None) -> str: - if home_path == None: +def get_homebrew_path(home_path=None) -> str: + if home_path is None: home_path = get_home_path() return os.path.join(home_path, "homebrew") + # Recursively create path and chown as user def mkdir_as_user(path): path = os.path.realpath(path) @@ -113,11 +142,17 @@ def mkdir_as_user(path): chown_path = os.path.join(chown_path, p) os.chown(chown_path, uid, gid) + # Fetches the version of loader def get_loader_version() -> str: - with open(os.path.join(os.path.dirname(sys.argv[0]), ".loader.version"), "r", encoding="utf-8") as version_file: + with open( + os.path.join(os.path.dirname(sys.argv[0]), ".loader.version"), + "r", + encoding="utf-8", + ) as version_file: return version_file.readline().replace("\n", "") + # Download Remote Binaries to local Plugin async def download_remote_binary_to_path(url, binHash, path) -> bool: rv = False @@ -130,11 +165,13 @@ async def download_remote_binary_to_path(url, binHash, path) -> bool: remoteHash = sha256(data.getbuffer()).hexdigest() if binHash == remoteHash: data.seek(0) - with open(path, 'wb') as f: + with open(path, "wb") as f: f.write(data.getbuffer()) rv = True else: - raise Exception(f"Fatal Error: Hash Mismatch for remote binary {path}@{url}") + raise Exception( + f"Fatal Error: Hash Mismatch for remote binary {path}@{url}" + ) else: rv = False except: @@ -142,15 +179,22 @@ async def download_remote_binary_to_path(url, binHash, path) -> bool: return rv + async def is_systemd_unit_active(unit_name: str) -> bool: - res = subprocess.run(["systemctl", "is-active", unit_name], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + res = subprocess.run( + ["systemctl", "is-active", unit_name], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) return res.returncode == 0 + async def stop_systemd_unit(unit_name: str) -> subprocess.CompletedProcess: cmd = ["systemctl", "stop", unit_name] return subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + async def start_systemd_unit(unit_name: str) -> subprocess.CompletedProcess: cmd = ["systemctl", "start", unit_name] |
