1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
|
import uuid
from json.decoder import JSONDecodeError
from aiohttp import ClientSession, web
from injector import inject_to_tab
import helpers
class Utilities:
def __init__(self, context) -> None:
self.context = context
self.util_methods = {
"ping": self.ping,
"http_request": self.http_request,
"cancel_plugin_install": self.cancel_plugin_install,
"confirm_plugin_install": self.confirm_plugin_install,
"execute_in_tab": self.execute_in_tab,
"inject_css_into_tab": self.inject_css_into_tab,
"remove_css_from_tab": self.remove_css_from_tab
}
if context:
context.web_app.add_routes([
web.post("/methods/{method_name}", self._handle_server_method_call)
])
async def _handle_server_method_call(self, request):
method_name = request.match_info["method_name"]
try:
args = await request.json()
except JSONDecodeError:
args = {}
res = {}
try:
r = await self.util_methods[method_name](**args)
res["result"] = r
res["success"] = True
except Exception as e:
res["result"] = str(e)
res["success"] = False
return web.json_response(res)
async def confirm_plugin_install(self, request_id):
return await self.context.plugin_browser.confirm_plugin_install(request_id)
def cancel_plugin_install(self, request_id):
return self.context.plugin_browser.cancel_plugin_install(request_id)
async def http_request(self, method="", url="", **kwargs):
async with ClientSession() as web:
async with web.request(method, url, ssl=helpers.get_ssl_context(), **kwargs) as res:
return {
"status": res.status,
"headers": dict(res.headers),
"body": await res.text()
}
async def ping(self, **kwargs):
return "pong"
async def execute_in_tab(self, tab, run_async, code):
try:
result = await inject_to_tab(tab, code, run_async)
if "exceptionDetails" in result["result"]:
return {
"success": False,
"result": result["result"]
}
return {
"success": True,
"result" : result["result"]["result"].get("value")
}
except Exception as e:
return {
"success": False,
"result": e
}
async def inject_css_into_tab(self, tab, style):
try:
css_id = str(uuid.uuid4())
result = await inject_to_tab(tab,
f"""
(function() {{
const style = document.createElement('style');
style.id = "{css_id}";
document.head.append(style);
style.textContent = `{style}`;
}})()
""", False)
if "exceptionDetails" in result["result"]:
return {
"success": False,
"result": result["result"]
}
return {
"success": True,
"result" : css_id
}
except Exception as e:
return {
"success": False,
"result": e
}
async def remove_css_from_tab(self, tab, css_id):
try:
result = await inject_to_tab(tab,
f"""
(function() {{
let style = document.getElementById("{css_id}");
if (style.nodeName.toLowerCase() == 'style')
style.parentNode.removeChild(style);
}})()
""", False)
if "exceptionDetails" in result["result"]:
return {
"success": False,
"result": result
}
return {
"success": True
}
except Exception as e:
return {
"success": False,
"result": e
}
|