From e2d708a6af0ec75c557b11d3a442af57240302b4 Mon Sep 17 00:00:00 2001 From: AAGaming Date: Sat, 26 Aug 2023 22:06:01 -0400 Subject: begin adding static types to backend code --- backend/injector.py | 84 +++++++++++++++++++++++++++++++---------------------- 1 file changed, 50 insertions(+), 34 deletions(-) (limited to 'backend/injector.py') diff --git a/backend/injector.py b/backend/injector.py index e3414fee..a217f689 100644 --- a/backend/injector.py +++ b/backend/injector.py @@ -2,10 +2,9 @@ from asyncio import sleep from logging import getLogger -from traceback import format_exc -from typing import List +from typing import Any, Callable, List, TypedDict, Dict -from aiohttp import ClientSession, WSMsgType +from aiohttp import ClientSession from aiohttp.client_exceptions import ClientConnectorError, ClientOSError from asyncio.exceptions import TimeoutError import uuid @@ -14,35 +13,43 @@ BASE_ADDRESS = "http://localhost:8080" logger = getLogger("Injector") +class _TabResponse(TypedDict): + title: str + id: str + url: str + webSocketDebuggerUrl: str class Tab: cmd_id = 0 - def __init__(self, res) -> None: - self.title = res["title"] - self.id = res["id"] - self.url = res["url"] - self.ws_url = res["webSocketDebuggerUrl"] + def __init__(self, res: _TabResponse) -> None: + self.title: str = res["title"] + self.id: str = res["id"] + self.url: str = res["url"] + self.ws_url: str = res["webSocketDebuggerUrl"] self.websocket = None self.client = None async def open_websocket(self): self.client = ClientSession() - self.websocket = await self.client.ws_connect(self.ws_url) + self.websocket = await self.client.ws_connect(self.ws_url) # type: ignore async def close_websocket(self): - await self.websocket.close() - await self.client.close() + if self.websocket: + await self.websocket.close() + if self.client: + await self.client.close() async def listen_for_message(self): - async for message in self.websocket: - data = message.json() - yield data - logger.warn(f"The Tab {self.title} socket has been disconnected while listening for messages.") - await self.close_websocket() + if self.websocket: + async for message in self.websocket: + data = message.json() + yield data + logger.warn(f"The Tab {self.title} socket has been disconnected while listening for messages.") + await self.close_websocket() - async def _send_devtools_cmd(self, dc, receive=True): + async def _send_devtools_cmd(self, dc: Dict[str, Any], receive: bool = True): if self.websocket: self.cmd_id += 1 dc["id"] = self.cmd_id @@ -54,7 +61,7 @@ class Tab: return None raise RuntimeError("Websocket not opened") - async def evaluate_js(self, js, run_async=False, manage_socket=True, get_result=True): + async def evaluate_js(self, js: str, run_async: bool | None = False, manage_socket: bool | None = True, get_result: bool = True): try: if manage_socket: await self.open_websocket() @@ -73,15 +80,16 @@ class Tab: await self.close_websocket() return res - async def has_global_var(self, var_name, manage_socket=True): + async def has_global_var(self, var_name: str, manage_socket: bool = True): res = await self.evaluate_js(f"window['{var_name}'] !== null && window['{var_name}'] !== undefined", False, manage_socket) + assert res is not None if not "result" in res or not "result" in res["result"] or not "value" in res["result"]["result"]: return False return res["result"]["result"]["value"] - async def close(self, manage_socket=True): + async def close(self, manage_socket: bool = True): try: if manage_socket: await self.open_websocket() @@ -111,7 +119,7 @@ class Tab: "method": "Page.disable", }, False) - async def refresh(self, manage_socket=True): + async def refresh(self, manage_socket: bool = True): try: if manage_socket: await self.open_websocket() @@ -125,7 +133,7 @@ class Tab: await self.close_websocket() return - async def reload_and_evaluate(self, js, manage_socket=True): + async def reload_and_evaluate(self, js: str, manage_socket: bool = True): """ Reloads the current tab, with JS to run on load via debugger """ @@ -153,11 +161,13 @@ class Tab: } }, True) + assert breakpoint_res is not None + logger.info(breakpoint_res) # Page finishes loading when breakpoint hits - for x in range(20): + for _ in range(20): # this works around 1/5 of the time, so just send it 8 times. # the js accounts for being injected multiple times allowing only one instance to run at a time anyway await self._send_devtools_cmd({ @@ -176,7 +186,7 @@ class Tab: } }, False) - for x in range(4): + for _ in range(4): await self._send_devtools_cmd({ "method": "Debugger.resume" }, False) @@ -190,7 +200,7 @@ class Tab: await self.close_websocket() return - async def add_script_to_evaluate_on_new_document(self, js, add_dom_wrapper=True, manage_socket=True, get_result=True): + async def add_script_to_evaluate_on_new_document(self, js: str, add_dom_wrapper: bool = True, manage_socket: bool = True, get_result: bool = True): """ How the underlying call functions is not particularly clear from the devtools docs, so stealing puppeteer's description: @@ -253,7 +263,7 @@ class Tab: await self.close_websocket() return res - async def remove_script_to_evaluate_on_new_document(self, script_id, manage_socket=True): + async def remove_script_to_evaluate_on_new_document(self, script_id: str, manage_socket: bool = True): """ Removes a script from a page that was added with `add_script_to_evaluate_on_new_document` @@ -267,7 +277,7 @@ class Tab: if manage_socket: await self.open_websocket() - res = await self._send_devtools_cmd({ + await self._send_devtools_cmd({ "method": "Page.removeScriptToEvaluateOnNewDocument", "params": { "identifier": script_id @@ -278,15 +288,16 @@ class Tab: if manage_socket: await self.close_websocket() - async def has_element(self, element_name, manage_socket=True): + async def has_element(self, element_name: str, manage_socket: bool = True): res = await self.evaluate_js(f"document.getElementById('{element_name}') != null", False, manage_socket) + assert res is not None if not "result" in res or not "result" in res["result"] or not "value" in res["result"]["result"]: return False return res["result"]["result"]["value"] - async def inject_css(self, style, manage_socket=True): + async def inject_css(self, style: str, manage_socket: bool = True): try: css_id = str(uuid.uuid4()) @@ -300,6 +311,8 @@ class Tab: }})() """, False, manage_socket) + assert result is not None + if "exceptionDetails" in result["result"]: return { "success": False, @@ -316,7 +329,7 @@ class Tab: "result": e } - async def remove_css(self, css_id, manage_socket=True): + async def remove_css(self, css_id: str, manage_socket: bool = True): try: result = await self.evaluate_js( f""" @@ -328,6 +341,8 @@ class Tab: }})() """, False, manage_socket) + assert result is not None + if "exceptionDetails" in result["result"]: return { "success": False, @@ -343,8 +358,9 @@ class Tab: "result": e } - async def get_steam_resource(self, url): + async def get_steam_resource(self, url: str): res = await self.evaluate_js(f'(async function test() {{ return await (await fetch("{url}")).text() }})()', True) + assert res is not None return res["result"]["result"]["value"] def __repr__(self): @@ -380,14 +396,14 @@ async def get_tabs() -> List[Tab]: raise Exception(f"/json did not return 200. {await res.text()}") -async def get_tab(tab_name) -> Tab: +async def get_tab(tab_name: str) -> Tab: tabs = await get_tabs() tab = next((i for i in tabs if i.title == tab_name), None) if not tab: raise ValueError(f"Tab {tab_name} not found") return tab -async def get_tab_lambda(test) -> Tab: +async def get_tab_lambda(test: Callable[[Tab], bool]) -> Tab: tabs = await get_tabs() tab = next((i for i in tabs if test(i)), None) if not tab: @@ -408,7 +424,7 @@ async def get_gamepadui_tab() -> Tab: raise ValueError(f"GamepadUI Tab not found") return tab -async def inject_to_tab(tab_name, js, run_async=False): +async def inject_to_tab(tab_name: str, js: str, run_async: bool = False): tab = await get_tab(tab_name) return await tab.evaluate_js(js, run_async) -- cgit v1.2.3