summaryrefslogtreecommitdiff
path: root/backend/helpers.py
diff options
context:
space:
mode:
Diffstat (limited to 'backend/helpers.py')
-rw-r--r--backend/helpers.py70
1 files changed, 57 insertions, 13 deletions
diff --git a/backend/helpers.py b/backend/helpers.py
index 7cab512b..d817f0b9 100644
--- a/backend/helpers.py
+++ b/backend/helpers.py
@@ -6,8 +6,6 @@ import subprocess
import uuid
import os
import sys
-from subprocess import check_output
-from time import sleep
from hashlib import sha256
from io import BytesIO
@@ -24,22 +22,38 @@ ssl_ctx = ssl.create_default_context(cafile=certifi.where())
assets_regex = re.compile("^/plugins/.*/assets/.*")
frontend_regex = re.compile("^/frontend/.*")
+
def get_ssl_context():
return ssl_ctx
+
def get_csrf_token():
return csrf_token
+
@middleware
async def csrf_middleware(request, handler):
- if str(request.method) == "OPTIONS" or request.headers.get('Authentication') == csrf_token or str(request.rel_url) == "/auth/token" or str(request.rel_url).startswith("/plugins/load_main/") or str(request.rel_url).startswith("/static/") or str(request.rel_url).startswith("/legacy/") or str(request.rel_url).startswith("/steam_resource/") or str(request.rel_url).startswith("/frontend/") or assets_regex.match(str(request.rel_url)) or frontend_regex.match(str(request.rel_url)):
+ if (
+ str(request.method) == "OPTIONS"
+ or request.headers.get("Authentication") == csrf_token
+ or str(request.rel_url) == "/auth/token"
+ or str(request.rel_url).startswith("/plugins/load_main/")
+ or str(request.rel_url).startswith("/static/")
+ or str(request.rel_url).startswith("/legacy/")
+ or str(request.rel_url).startswith("/steam_resource/")
+ or str(request.rel_url).startswith("/frontend/")
+ or assets_regex.match(str(request.rel_url))
+ or frontend_regex.match(str(request.rel_url))
+ ):
return await handler(request)
- return Response(text='Forbidden', status='403')
+ return Response(text="Forbidden", status="403")
+
# Deprecated
def set_user():
pass
+
# Get the user id hosting the plugin loader
def get_user_id() -> int:
proc_path = os.path.realpath(sys.argv[0])
@@ -47,60 +61,75 @@ def get_user_id() -> int:
for pw in pws:
if proc_path.startswith(os.path.realpath(pw.pw_dir)):
return pw.pw_uid
- raise PermissionError("The plugin loader does not seem to be hosted by any known user.")
+ raise PermissionError(
+ "The plugin loader does not seem to be hosted by any known user."
+ )
+
# Get the user hosting the plugin loader
def get_user() -> str:
return pwd.getpwuid(get_user_id()).pw_name
+
# Get the effective user id of the running process
def get_effective_user_id() -> int:
return os.geteuid()
+
# Get the effective user of the running process
def get_effective_user() -> str:
return pwd.getpwuid(get_effective_user_id()).pw_name
+
# Get the effective user group id of the running process
def get_effective_user_group_id() -> int:
return os.getegid()
+
# Get the effective user group of the running process
def get_effective_user_group() -> str:
return grp.getgrgid(get_effective_user_group_id()).gr_name
+
# Get the user owner of the given file path.
def get_user_owner(file_path) -> str:
return pwd.getpwuid(os.stat(file_path).st_uid).pw_name
+
# Get the user group of the given file path.
def get_user_group(file_path) -> str:
return grp.getgrgid(os.stat(file_path).st_gid).gr_name
+
# Deprecated
def set_user_group() -> str:
return get_user_group()
+
# Get the group id of the user hosting the plugin loader
def get_user_group_id() -> int:
return pwd.getpwuid(get_user_id()).pw_gid
+
# Get the group of the user hosting the plugin loader
def get_user_group() -> str:
return grp.getgrgid(get_user_group_id()).gr_name
+
# Get the default home path unless a user is specified
-def get_home_path(username = None) -> str:
- if username == None:
+def get_home_path(username=None) -> str:
+ if username is None:
username = get_user()
return pwd.getpwnam(username).pw_dir
+
# Get the default homebrew path unless a home_path is specified
-def get_homebrew_path(home_path = None) -> str:
- if home_path == None:
+def get_homebrew_path(home_path=None) -> str:
+ if home_path is None:
home_path = get_home_path()
return os.path.join(home_path, "homebrew")
+
# Recursively create path and chown as user
def mkdir_as_user(path):
path = os.path.realpath(path)
@@ -113,11 +142,17 @@ def mkdir_as_user(path):
chown_path = os.path.join(chown_path, p)
os.chown(chown_path, uid, gid)
+
# Fetches the version of loader
def get_loader_version() -> str:
- with open(os.path.join(os.path.dirname(sys.argv[0]), ".loader.version"), "r", encoding="utf-8") as version_file:
+ with open(
+ os.path.join(os.path.dirname(sys.argv[0]), ".loader.version"),
+ "r",
+ encoding="utf-8",
+ ) as version_file:
return version_file.readline().replace("\n", "")
+
# Download Remote Binaries to local Plugin
async def download_remote_binary_to_path(url, binHash, path) -> bool:
rv = False
@@ -130,11 +165,13 @@ async def download_remote_binary_to_path(url, binHash, path) -> bool:
remoteHash = sha256(data.getbuffer()).hexdigest()
if binHash == remoteHash:
data.seek(0)
- with open(path, 'wb') as f:
+ with open(path, "wb") as f:
f.write(data.getbuffer())
rv = True
else:
- raise Exception(f"Fatal Error: Hash Mismatch for remote binary {path}@{url}")
+ raise Exception(
+ f"Fatal Error: Hash Mismatch for remote binary {path}@{url}"
+ )
else:
rv = False
except:
@@ -142,15 +179,22 @@ async def download_remote_binary_to_path(url, binHash, path) -> bool:
return rv
+
async def is_systemd_unit_active(unit_name: str) -> bool:
- res = subprocess.run(["systemctl", "is-active", unit_name], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
+ res = subprocess.run(
+ ["systemctl", "is-active", unit_name],
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ )
return res.returncode == 0
+
async def stop_systemd_unit(unit_name: str) -> subprocess.CompletedProcess:
cmd = ["systemctl", "stop", unit_name]
return subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+
async def start_systemd_unit(unit_name: str) -> subprocess.CompletedProcess:
cmd = ["systemctl", "start", unit_name]