1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
|
import os, pwd, grp, sys
from subprocess import call, run, DEVNULL, PIPE, STDOUT
from customtypes import UserType
# Get the user id hosting the plugin loader
def _get_user_id() -> int:
proc_path = os.path.realpath(sys.argv[0])
pws = sorted(pwd.getpwall(), reverse=True, key=lambda pw: len(pw.pw_dir))
for pw in pws:
if proc_path.startswith(os.path.realpath(pw.pw_dir)):
return pw.pw_uid
raise PermissionError("The plugin loader does not seem to be hosted by any known user.")
# Get the user hosting the plugin loader
def _get_user() -> str:
return pwd.getpwuid(_get_user_id()).pw_name
# Get the effective user id of the running process
def _get_effective_user_id() -> int:
return os.geteuid()
# Get the effective user of the running process
def _get_effective_user() -> str:
return pwd.getpwuid(_get_effective_user_id()).pw_name
# Get the effective user group id of the running process
def _get_effective_user_group_id() -> int:
return os.getegid()
# Get the effective user group of the running process
def _get_effective_user_group() -> str:
return grp.getgrgid(_get_effective_user_group_id()).gr_name
# Get the user owner of the given file path.
def _get_user_owner(file_path) -> str:
return pwd.getpwuid(os.stat(file_path).st_uid).pw_name
# Get the user group of the given file path.
def _get_user_group(file_path) -> str:
return grp.getgrgid(os.stat(file_path).st_gid).gr_name
# Get the group id of the user hosting the plugin loader
def _get_user_group_id() -> int:
return pwd.getpwuid(_get_user_id()).pw_gid
# Get the group of the user hosting the plugin loader
def _get_user_group() -> str:
return grp.getgrgid(_get_user_group_id()).gr_name
def chown(path : str, user : UserType = UserType.HOST_USER, recursive : bool = True) -> bool:
user_str = ""
if (user == UserType.HOST_USER):
user_str = _get_user()+":"+_get_user_group()
elif (user == UserType.EFFECTIVE_USER):
user_str = _get_effective_user()+":"+_get_effective_user_group()
else:
raise Exception("Unknown User Type")
result = call(["chown", "-R", user_str, path] if recursive else ["chown", user_str, path])
return result == 0
def chmod(path : str, permissions : int, recursive : bool = True) -> bool:
result = call(["chmod", "-R", str(permissions), path] if recursive else ["chmod", str(permissions), path])
return result == 0
def folder_owner(path : str) -> UserType|None:
user_owner = _get_user_owner(path)
if (user_owner == _get_user()):
return UserType.HOST_USER
elif (user_owner == _get_effective_user()):
return UserType.EFFECTIVE_USER
else:
return None
def get_home_path(user : UserType = UserType.HOST_USER) -> str:
user_name = "root"
if user == UserType.HOST_USER:
user_name = _get_user()
elif user == UserType.EFFECTIVE_USER:
user_name = _get_effective_user()
elif user == UserType.ROOT:
pass
else:
raise Exception("Unknown User Type")
return pwd.getpwnam(user_name).pw_dir
def get_username() -> str:
return _get_user()
def setgid(user : UserType = UserType.HOST_USER):
user_id = 0
if user == UserType.HOST_USER:
user_id = _get_user_group_id()
elif user == UserType.ROOT:
pass
else:
raise Exception("Unknown user type")
os.setgid(user_id)
def setuid(user : UserType = UserType.HOST_USER):
user_id = 0
if user == UserType.HOST_USER:
user_id = _get_user_id()
elif user == UserType.ROOT:
pass
else:
raise Exception("Unknown user type")
os.setuid(user_id)
async def service_active(service_name : str) -> bool:
res = run(["systemctl", "is-active", service_name], stdout=DEVNULL, stderr=DEVNULL)
return res.returncode == 0
async def service_restart(service_name : str) -> bool:
call(["systemctl", "daemon-reload"])
cmd = ["systemctl", "restart", service_name]
res = run(cmd, stdout=PIPE, stderr=STDOUT)
return res.returncode == 0
async def service_stop(service_name : str) -> bool:
cmd = ["systemctl", "stop", service_name]
res = run(cmd, stdout=PIPE, stderr=STDOUT)
return res.returncode == 0
async def service_start(service_name : str) -> bool:
cmd = ["systemctl", "start", service_name]
res = run(cmd, stdout=PIPE, stderr=STDOUT)
return res.returncode == 0
|