summaryrefslogtreecommitdiff
path: root/backend/helpers.py
blob: 35681f6889bc5b80f3e65b2175fad2df6d9652cc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import grp
import pwd
import re
import ssl
import subprocess
import uuid
import os
import sys
from subprocess import check_output
from time import sleep
from hashlib import sha256
from io import BytesIO

import certifi
from aiohttp.web import Response, middleware
from aiohttp import ClientSession

REMOTE_DEBUGGER_UNIT = "steam-web-debug-portforward.service"

# global vars
csrf_token = str(uuid.uuid4())
ssl_ctx = ssl.create_default_context(cafile=certifi.where())

assets_regex = re.compile("^/plugins/.*/assets/.*")
frontend_regex = re.compile("^/frontend/.*")

def get_ssl_context():
    return ssl_ctx

def get_csrf_token():
    return csrf_token

@middleware
async def csrf_middleware(request, handler):
    if str(request.method) == "OPTIONS" or request.headers.get('Authentication') == csrf_token or str(request.rel_url) == "/auth/token" or str(request.rel_url).startswith("/plugins/load_main/") or str(request.rel_url).startswith("/static/") or str(request.rel_url).startswith("/legacy/") or str(request.rel_url).startswith("/steam_resource/") or str(request.rel_url).startswith("/frontend/") or assets_regex.match(str(request.rel_url)) or frontend_regex.match(str(request.rel_url)):
        return await handler(request)
    return Response(text='Forbidden', status='403')

# Deprecated
def set_user():
    pass

# Get the user id hosting the plugin loader
def get_user_id() -> int:
    proc_path = os.path.realpath(sys.argv[0])
    pws = sorted(pwd.getpwall(), reverse=True, key=lambda pw: len(pw.pw_dir))
    for pw in pws:
        if proc_path.startswith(os.path.realpath(pw.pw_dir)):
            return pw.pw_uid
    raise PermissionError("The plugin loader does not seem to be hosted by any known user.")

# Get the user hosting the plugin loader
def get_user() -> str:
    return pwd.getpwuid(get_user_id()).pw_name

# Get the effective user id of the running process
def get_effective_user_id() -> int:
    return os.geteuid()

# Get the effective user of the running process
def get_effective_user() -> str:
    return pwd.getpwuid(get_effective_user_id()).pw_name

# Get the effective user group id of the running process
def get_effective_user_group_id() -> int:
    return os.getegid()

# Get the effective user group of the running process
def get_effective_user_group() -> str:
    return grp.getgrgid(get_effective_user_group_id()).gr_name

# Get the user owner of the given file path.
def get_user_owner(file_path) -> str:
    return pwd.getpwuid(os.stat(file_path).st_uid).pw_name

# Get the user group of the given file path.
def get_user_group(file_path) -> str:
    return grp.getgrgid(os.stat(file_path).st_gid).gr_name

# Deprecated
def set_user_group() -> str:
    return get_user_group()

# Get the group id of the user hosting the plugin loader
def get_user_group_id() -> int:
    return pwd.getpwuid(get_user_id()).pw_gid

# Get the group of the user hosting the plugin loader
def get_user_group() -> str:
    return grp.getgrgid(get_user_group_id()).gr_name

# Get the default home path unless a user is specified
def get_home_path(username = None) -> str:
    if username == None:
        username = get_user()
    return pwd.getpwnam(username).pw_dir

# Get the default homebrew path unless a home_path is specified
def get_homebrew_path(home_path = None) -> str:
    if home_path == None:
        home_path = get_home_path()
    return os.path.join(home_path, "homebrew")

# Recursively create path and chown as user
def mkdir_as_user(path):
    path = os.path.realpath(path)
    os.makedirs(path, exist_ok=True)
    chown_path = get_home_path()
    parts = os.path.relpath(path, chown_path).split(os.sep)
    uid = get_user_id()
    gid = get_user_group_id()
    for p in parts:
        chown_path = os.path.join(chown_path, p)
        os.chown(chown_path, uid, gid)

# Fetches the version of loader
def get_loader_version() -> str:
    try:
        with open(os.path.join(os.path.dirname(sys.argv[0]), ".loader.version"), "r", encoding="utf-8") as version_file:
            return version_file.readline().replace("\n", "")
    except:
        return ""

# returns the appropriate system python paths
def get_system_pythonpaths() -> list[str]:
    # run as normal normal user to also include user python paths
    proc = subprocess.run(["python3", "-c", "import sys; print(':'.join(x for x in sys.path if x))"],
                          user=get_user_id(), env={}, capture_output=True)
    return proc.stdout.decode().strip().split(":")

# Download Remote Binaries to local Plugin
async def download_remote_binary_to_path(url, binHash, path) -> bool:
    rv = False
    try:
        if os.access(os.path.dirname(path), os.W_OK):
            async with ClientSession() as client:
                res = await client.get(url, ssl=get_ssl_context())
            if res.status == 200:
                data = BytesIO(await res.read())
                remoteHash = sha256(data.getbuffer()).hexdigest()
                if binHash == remoteHash:
                    data.seek(0)
                    with open(path, 'wb') as f:
                        f.write(data.getbuffer())
                        rv = True
                else:
                    raise Exception(f"Fatal Error: Hash Mismatch for remote binary {path}@{url}")
            else:
                rv = False
    except:
        rv = False

    return rv

async def is_systemd_unit_active(unit_name: str) -> bool:
    res = subprocess.run(["systemctl", "is-active", unit_name], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
    return res.returncode == 0

async def stop_systemd_unit(unit_name: str) -> subprocess.CompletedProcess:
    cmd = ["systemctl", "stop", unit_name]

    return subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

async def start_systemd_unit(unit_name: str) -> subprocess.CompletedProcess:
    cmd = ["systemctl", "start", unit_name]

    return subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)