summaryrefslogtreecommitdiff
path: root/backend/injector.py
diff options
context:
space:
mode:
Diffstat (limited to 'backend/injector.py')
-rw-r--r--backend/injector.py84
1 files changed, 50 insertions, 34 deletions
diff --git a/backend/injector.py b/backend/injector.py
index e3414fee..a217f689 100644
--- a/backend/injector.py
+++ b/backend/injector.py
@@ -2,10 +2,9 @@
from asyncio import sleep
from logging import getLogger
-from traceback import format_exc
-from typing import List
+from typing import Any, Callable, List, TypedDict, Dict
-from aiohttp import ClientSession, WSMsgType
+from aiohttp import ClientSession
from aiohttp.client_exceptions import ClientConnectorError, ClientOSError
from asyncio.exceptions import TimeoutError
import uuid
@@ -14,35 +13,43 @@ BASE_ADDRESS = "http://localhost:8080"
logger = getLogger("Injector")
+class _TabResponse(TypedDict):
+ title: str
+ id: str
+ url: str
+ webSocketDebuggerUrl: str
class Tab:
cmd_id = 0
- def __init__(self, res) -> None:
- self.title = res["title"]
- self.id = res["id"]
- self.url = res["url"]
- self.ws_url = res["webSocketDebuggerUrl"]
+ def __init__(self, res: _TabResponse) -> None:
+ self.title: str = res["title"]
+ self.id: str = res["id"]
+ self.url: str = res["url"]
+ self.ws_url: str = res["webSocketDebuggerUrl"]
self.websocket = None
self.client = None
async def open_websocket(self):
self.client = ClientSession()
- self.websocket = await self.client.ws_connect(self.ws_url)
+ self.websocket = await self.client.ws_connect(self.ws_url) # type: ignore
async def close_websocket(self):
- await self.websocket.close()
- await self.client.close()
+ if self.websocket:
+ await self.websocket.close()
+ if self.client:
+ await self.client.close()
async def listen_for_message(self):
- async for message in self.websocket:
- data = message.json()
- yield data
- logger.warn(f"The Tab {self.title} socket has been disconnected while listening for messages.")
- await self.close_websocket()
+ if self.websocket:
+ async for message in self.websocket:
+ data = message.json()
+ yield data
+ logger.warn(f"The Tab {self.title} socket has been disconnected while listening for messages.")
+ await self.close_websocket()
- async def _send_devtools_cmd(self, dc, receive=True):
+ async def _send_devtools_cmd(self, dc: Dict[str, Any], receive: bool = True):
if self.websocket:
self.cmd_id += 1
dc["id"] = self.cmd_id
@@ -54,7 +61,7 @@ class Tab:
return None
raise RuntimeError("Websocket not opened")
- async def evaluate_js(self, js, run_async=False, manage_socket=True, get_result=True):
+ async def evaluate_js(self, js: str, run_async: bool | None = False, manage_socket: bool | None = True, get_result: bool = True):
try:
if manage_socket:
await self.open_websocket()
@@ -73,15 +80,16 @@ class Tab:
await self.close_websocket()
return res
- async def has_global_var(self, var_name, manage_socket=True):
+ async def has_global_var(self, var_name: str, manage_socket: bool = True):
res = await self.evaluate_js(f"window['{var_name}'] !== null && window['{var_name}'] !== undefined", False, manage_socket)
+ assert res is not None
if not "result" in res or not "result" in res["result"] or not "value" in res["result"]["result"]:
return False
return res["result"]["result"]["value"]
- async def close(self, manage_socket=True):
+ async def close(self, manage_socket: bool = True):
try:
if manage_socket:
await self.open_websocket()
@@ -111,7 +119,7 @@ class Tab:
"method": "Page.disable",
}, False)
- async def refresh(self, manage_socket=True):
+ async def refresh(self, manage_socket: bool = True):
try:
if manage_socket:
await self.open_websocket()
@@ -125,7 +133,7 @@ class Tab:
await self.close_websocket()
return
- async def reload_and_evaluate(self, js, manage_socket=True):
+ async def reload_and_evaluate(self, js: str, manage_socket: bool = True):
"""
Reloads the current tab, with JS to run on load via debugger
"""
@@ -153,11 +161,13 @@ class Tab:
}
}, True)
+ assert breakpoint_res is not None
+
logger.info(breakpoint_res)
# Page finishes loading when breakpoint hits
- for x in range(20):
+ for _ in range(20):
# this works around 1/5 of the time, so just send it 8 times.
# the js accounts for being injected multiple times allowing only one instance to run at a time anyway
await self._send_devtools_cmd({
@@ -176,7 +186,7 @@ class Tab:
}
}, False)
- for x in range(4):
+ for _ in range(4):
await self._send_devtools_cmd({
"method": "Debugger.resume"
}, False)
@@ -190,7 +200,7 @@ class Tab:
await self.close_websocket()
return
- async def add_script_to_evaluate_on_new_document(self, js, add_dom_wrapper=True, manage_socket=True, get_result=True):
+ async def add_script_to_evaluate_on_new_document(self, js: str, add_dom_wrapper: bool = True, manage_socket: bool = True, get_result: bool = True):
"""
How the underlying call functions is not particularly clear from the devtools docs, so stealing puppeteer's description:
@@ -253,7 +263,7 @@ class Tab:
await self.close_websocket()
return res
- async def remove_script_to_evaluate_on_new_document(self, script_id, manage_socket=True):
+ async def remove_script_to_evaluate_on_new_document(self, script_id: str, manage_socket: bool = True):
"""
Removes a script from a page that was added with `add_script_to_evaluate_on_new_document`
@@ -267,7 +277,7 @@ class Tab:
if manage_socket:
await self.open_websocket()
- res = await self._send_devtools_cmd({
+ await self._send_devtools_cmd({
"method": "Page.removeScriptToEvaluateOnNewDocument",
"params": {
"identifier": script_id
@@ -278,15 +288,16 @@ class Tab:
if manage_socket:
await self.close_websocket()
- async def has_element(self, element_name, manage_socket=True):
+ async def has_element(self, element_name: str, manage_socket: bool = True):
res = await self.evaluate_js(f"document.getElementById('{element_name}') != null", False, manage_socket)
+ assert res is not None
if not "result" in res or not "result" in res["result"] or not "value" in res["result"]["result"]:
return False
return res["result"]["result"]["value"]
- async def inject_css(self, style, manage_socket=True):
+ async def inject_css(self, style: str, manage_socket: bool = True):
try:
css_id = str(uuid.uuid4())
@@ -300,6 +311,8 @@ class Tab:
}})()
""", False, manage_socket)
+ assert result is not None
+
if "exceptionDetails" in result["result"]:
return {
"success": False,
@@ -316,7 +329,7 @@ class Tab:
"result": e
}
- async def remove_css(self, css_id, manage_socket=True):
+ async def remove_css(self, css_id: str, manage_socket: bool = True):
try:
result = await self.evaluate_js(
f"""
@@ -328,6 +341,8 @@ class Tab:
}})()
""", False, manage_socket)
+ assert result is not None
+
if "exceptionDetails" in result["result"]:
return {
"success": False,
@@ -343,8 +358,9 @@ class Tab:
"result": e
}
- async def get_steam_resource(self, url):
+ async def get_steam_resource(self, url: str):
res = await self.evaluate_js(f'(async function test() {{ return await (await fetch("{url}")).text() }})()', True)
+ assert res is not None
return res["result"]["result"]["value"]
def __repr__(self):
@@ -380,14 +396,14 @@ async def get_tabs() -> List[Tab]:
raise Exception(f"/json did not return 200. {await res.text()}")
-async def get_tab(tab_name) -> Tab:
+async def get_tab(tab_name: str) -> Tab:
tabs = await get_tabs()
tab = next((i for i in tabs if i.title == tab_name), None)
if not tab:
raise ValueError(f"Tab {tab_name} not found")
return tab
-async def get_tab_lambda(test) -> Tab:
+async def get_tab_lambda(test: Callable[[Tab], bool]) -> Tab:
tabs = await get_tabs()
tab = next((i for i in tabs if test(i)), None)
if not tab:
@@ -408,7 +424,7 @@ async def get_gamepadui_tab() -> Tab:
raise ValueError(f"GamepadUI Tab not found")
return tab
-async def inject_to_tab(tab_name, js, run_async=False):
+async def inject_to_tab(tab_name: str, js: str, run_async: bool = False):
tab = await get_tab(tab_name)
return await tab.evaluate_js(js, run_async)