summaryrefslogtreecommitdiff
path: root/backend/injector.py
diff options
context:
space:
mode:
Diffstat (limited to 'backend/injector.py')
-rw-r--r--backend/injector.py142
1 files changed, 114 insertions, 28 deletions
diff --git a/backend/injector.py b/backend/injector.py
index de914d17..c0d9cbad 100644
--- a/backend/injector.py
+++ b/backend/injector.py
@@ -3,6 +3,7 @@
from asyncio import sleep
from logging import getLogger
from traceback import format_exc
+from typing import List
from aiohttp import ClientSession
from aiohttp.client_exceptions import ClientConnectorError
@@ -18,6 +19,7 @@ class Tab:
def __init__(self, res) -> None:
self.title = res["title"]
self.id = res["id"]
+ self.url = res["url"]
self.ws_url = res["webSocketDebuggerUrl"]
self.websocket = None
@@ -64,6 +66,26 @@ class Tab:
await self.close_websocket()
return res
+ async def has_global_var(self, var_name, manage_socket=True):
+ res = await self.evaluate_js(f"window['{var_name}'] !== null && window['{var_name}'] !== undefined", False, manage_socket)
+
+ if not "result" in res or not "result" in res["result"] or not "value" in res["result"]["result"]:
+ return False
+
+ return res["result"]["result"]["value"]
+
+ async def close(self, manage_socket=True):
+ if manage_socket:
+ await self.open_websocket()
+
+ res = await self._send_devtools_cmd({
+ "method": "Page.close",
+ }, False)
+
+ if manage_socket:
+ await self.close_websocket()
+ return res
+
async def enable(self):
"""
Enables page domain notifications.
@@ -80,6 +102,18 @@ class Tab:
"method": "Page.disable",
}, False)
+ async def refresh(self):
+ if manage_socket:
+ await self.open_websocket()
+
+ await self._send_devtools_cmd({
+ "method": "Page.reload",
+ }, False)
+
+ if manage_socket:
+ await self.close_websocket()
+
+ return
async def reload_and_evaluate(self, js, manage_socket=True):
"""
Reloads the current tab, with JS to run on load via debugger
@@ -227,6 +261,71 @@ class Tab:
if manage_socket:
await self.close_websocket()
+ async def has_element(self, element_name, manage_socket=True):
+ res = await self.evaluate_js(f"document.getElementById('{element_name}') != null", False, manage_socket)
+
+ if not "result" in res or not "result" in res["result"] or not "value" in res["result"]["result"]:
+ return False
+
+ return res["result"]["result"]["value"]
+
+ async def inject_css(self, style, manage_socket=True):
+ try:
+ css_id = str(uuid.uuid4())
+
+ result = await self.evaluate_js(
+ f"""
+ (function() {{
+ const style = document.createElement('style');
+ style.id = "{css_id}";
+ document.head.append(style);
+ style.textContent = `{style}`;
+ }})()
+ """, False, manage_socket)
+
+ if "exceptionDetails" in result["result"]:
+ return {
+ "success": False,
+ "result": result["result"]
+ }
+
+ return {
+ "success": True,
+ "result": css_id
+ }
+ except Exception as e:
+ return {
+ "success": False,
+ "result": e
+ }
+
+ async def remove_css(self, css_id, manage_socket=True):
+ try:
+ result = await self.evaluate_js(
+ f"""
+ (function() {{
+ let style = document.getElementById("{css_id}");
+
+ if (style.nodeName.toLowerCase() == 'style')
+ style.parentNode.removeChild(style);
+ }})()
+ """, False, manage_socket)
+
+ if "exceptionDetails" in result["result"]:
+ return {
+ "success": False,
+ "result": result
+ }
+
+ return {
+ "success": True
+ }
+ except Exception as e:
+ return {
+ "success": False,
+ "result": e
+ }
+
async def get_steam_resource(self, url):
res = await self.evaluate_js(f'(async function test() {{ return await (await fetch("{url}")).text() }})()', True)
return res["result"]["result"]["value"]
@@ -235,7 +334,7 @@ class Tab:
return self.title
-async def get_tabs():
+async def get_tabs() -> List[Tab]:
async with ClientSession() as web:
res = {}
@@ -257,41 +356,28 @@ async def get_tabs():
raise Exception(f"/json did not return 200. {await res.text()}")
-async def get_tab(tab_name):
+async def get_tab(tab_name) -> Tab:
tabs = await get_tabs()
tab = next((i for i in tabs if i.title == tab_name), None)
if not tab:
raise ValueError(f"Tab {tab_name} not found")
return tab
+async def get_tab_lambda(test) -> Tab:
+ tabs = await get_tabs()
+ tab = next((i for i in tabs if test(i)), None)
+ if not tab:
+ raise ValueError(f"Tab not found by lambda")
+ return tab
+
+async def get_gamepadui_tab() -> Tab:
+ tabs = await get_tabs()
+ tab = next((i for i in tabs if ("https://steamloopback.host/routes/" in i.url and (i.title == "Steam" or i.title == "SP"))), None)
+ if not tab:
+ raise ValueError(f"GamepadUI Tab not found")
+ return tab
async def inject_to_tab(tab_name, js, run_async=False):
tab = await get_tab(tab_name)
return await tab.evaluate_js(js, run_async)
-
-
-async def tab_has_global_var(tab_name, var_name):
- try:
- tab = await get_tab(tab_name)
- except ValueError:
- return False
- res = await tab.evaluate_js(f"window['{var_name}'] !== null && window['{var_name}'] !== undefined", False)
-
- if not "result" in res or not "result" in res["result"] or not "value" in res["result"]["result"]:
- return False
-
- return res["result"]["result"]["value"]
-
-
-async def tab_has_element(tab_name, element_name):
- try:
- tab = await get_tab(tab_name)
- except ValueError:
- return False
- res = await tab.evaluate_js(f"document.getElementById('{element_name}') != null", False)
-
- if not "result" in res or not "result" in res["result"] or not "value" in res["result"]["result"]:
- return False
-
- return res["result"]["result"]["value"]