diff options
| author | TrainDoctor <traindoctor@protonmail.com> | 2023-10-25 19:47:33 -0700 |
|---|---|---|
| committer | TrainDoctor <traindoctor@protonmail.com> | 2023-10-25 19:47:33 -0700 |
| commit | a7669799bca3ff4089ab81fde924b2d2f787cf0f (patch) | |
| tree | 0afcb03ec1c01efac90be81674df649d6cef186e /backend/src/injector.py | |
| parent | dacd2c19eb51ba525288fccb3ded653e45ca4409 (diff) | |
| download | decky-loader-a7669799bca3ff4089ab81fde924b2d2f787cf0f.tar.gz decky-loader-a7669799bca3ff4089ab81fde924b2d2f787cf0f.zip | |
Merge aa/type-cleanup-py (work by marios, aa, wolv)v2.10.6-pre1
Diffstat (limited to 'backend/src/injector.py')
| -rw-r--r-- | backend/src/injector.py | 438 |
1 files changed, 438 insertions, 0 deletions
diff --git a/backend/src/injector.py b/backend/src/injector.py new file mode 100644 index 00000000..a217f689 --- /dev/null +++ b/backend/src/injector.py @@ -0,0 +1,438 @@ +# Injector code from https://github.com/SteamDeckHomebrew/steamdeck-ui-inject. More info on how it works there. + +from asyncio import sleep +from logging import getLogger +from typing import Any, Callable, List, TypedDict, Dict + +from aiohttp import ClientSession +from aiohttp.client_exceptions import ClientConnectorError, ClientOSError +from asyncio.exceptions import TimeoutError +import uuid + +BASE_ADDRESS = "http://localhost:8080" + +logger = getLogger("Injector") + +class _TabResponse(TypedDict): + title: str + id: str + url: str + webSocketDebuggerUrl: str + +class Tab: + cmd_id = 0 + + def __init__(self, res: _TabResponse) -> None: + self.title: str = res["title"] + self.id: str = res["id"] + self.url: str = res["url"] + self.ws_url: str = res["webSocketDebuggerUrl"] + + self.websocket = None + self.client = None + + async def open_websocket(self): + self.client = ClientSession() + self.websocket = await self.client.ws_connect(self.ws_url) # type: ignore + + async def close_websocket(self): + if self.websocket: + await self.websocket.close() + if self.client: + await self.client.close() + + async def listen_for_message(self): + if self.websocket: + async for message in self.websocket: + data = message.json() + yield data + logger.warn(f"The Tab {self.title} socket has been disconnected while listening for messages.") + await self.close_websocket() + + async def _send_devtools_cmd(self, dc: Dict[str, Any], receive: bool = True): + if self.websocket: + self.cmd_id += 1 + dc["id"] = self.cmd_id + await self.websocket.send_json(dc) + if receive: + async for msg in self.listen_for_message(): + if "id" in msg and msg["id"] == dc["id"]: + return msg + return None + raise RuntimeError("Websocket not opened") + + async def evaluate_js(self, js: str, run_async: bool | None = False, manage_socket: bool | None = True, get_result: bool = True): + try: + if manage_socket: + await self.open_websocket() + + res = await self._send_devtools_cmd({ + "method": "Runtime.evaluate", + "params": { + "expression": js, + "userGesture": True, + "awaitPromise": run_async + } + }, get_result) + + finally: + if manage_socket: + await self.close_websocket() + return res + + async def has_global_var(self, var_name: str, manage_socket: bool = True): + res = await self.evaluate_js(f"window['{var_name}'] !== null && window['{var_name}'] !== undefined", False, manage_socket) + assert res is not None + + if not "result" in res or not "result" in res["result"] or not "value" in res["result"]["result"]: + return False + + return res["result"]["result"]["value"] + + async def close(self, manage_socket: bool = True): + try: + if manage_socket: + await self.open_websocket() + + res = await self._send_devtools_cmd({ + "method": "Page.close", + }, False) + + finally: + if manage_socket: + await self.close_websocket() + return res + + async def enable(self): + """ + Enables page domain notifications. + """ + await self._send_devtools_cmd({ + "method": "Page.enable", + }, False) + + async def disable(self): + """ + Disables page domain notifications. + """ + await self._send_devtools_cmd({ + "method": "Page.disable", + }, False) + + async def refresh(self, manage_socket: bool = True): + try: + if manage_socket: + await self.open_websocket() + + await self._send_devtools_cmd({ + "method": "Page.reload", + }, False) + + finally: + if manage_socket: + await self.close_websocket() + + return + async def reload_and_evaluate(self, js: str, manage_socket: bool = True): + """ + Reloads the current tab, with JS to run on load via debugger + """ + try: + if manage_socket: + await self.open_websocket() + + await self._send_devtools_cmd({ + "method": "Debugger.enable" + }, True) + + await self._send_devtools_cmd({ + "method": "Runtime.evaluate", + "params": { + "expression": "location.reload();", + "userGesture": True, + "awaitPromise": False + } + }, False) + + breakpoint_res = await self._send_devtools_cmd({ + "method": "Debugger.setInstrumentationBreakpoint", + "params": { + "instrumentation": "beforeScriptExecution" + } + }, True) + + assert breakpoint_res is not None + + logger.info(breakpoint_res) + + # Page finishes loading when breakpoint hits + + for _ in range(20): + # this works around 1/5 of the time, so just send it 8 times. + # the js accounts for being injected multiple times allowing only one instance to run at a time anyway + await self._send_devtools_cmd({ + "method": "Runtime.evaluate", + "params": { + "expression": js, + "userGesture": True, + "awaitPromise": False + } + }, False) + + await self._send_devtools_cmd({ + "method": "Debugger.removeBreakpoint", + "params": { + "breakpointId": breakpoint_res["result"]["breakpointId"] + } + }, False) + + for _ in range(4): + await self._send_devtools_cmd({ + "method": "Debugger.resume" + }, False) + + await self._send_devtools_cmd({ + "method": "Debugger.disable" + }, True) + + finally: + if manage_socket: + await self.close_websocket() + return + + async def add_script_to_evaluate_on_new_document(self, js: str, add_dom_wrapper: bool = True, manage_socket: bool = True, get_result: bool = True): + """ + How the underlying call functions is not particularly clear from the devtools docs, so stealing puppeteer's description: + + Adds a function which would be invoked in one of the following scenarios: + * whenever the page is navigated + * whenever the child frame is attached or navigated. In this case, the + function is invoked in the context of the newly attached frame. + + The function is invoked after the document was created but before any of + its scripts were run. This is useful to amend the JavaScript environment, + e.g. to seed `Math.random`. + + Parameters + ---------- + js : str + The script to evaluate on new document + add_dom_wrapper : bool + True to wrap the script in a wait for the 'DOMContentLoaded' event. + DOM will usually not exist when this execution happens, + so it is necessary to delay til DOM is loaded if you are modifying it + manage_socket : bool + True to have this function handle opening/closing the websocket for this tab + get_result : bool + True to wait for the result of this call + + Returns + ------- + int or None + The identifier of the script added, used to remove it later. + (see remove_script_to_evaluate_on_new_document below) + None is returned if `get_result` is False + """ + try: + + wrappedjs = """ + function scriptFunc() { + {js} + } + if (document.readyState === 'loading') { + addEventListener('DOMContentLoaded', () => { + scriptFunc(); + }); + } else { + scriptFunc(); + } + """.format(js=js) if add_dom_wrapper else js + + if manage_socket: + await self.open_websocket() + + res = await self._send_devtools_cmd({ + "method": "Page.addScriptToEvaluateOnNewDocument", + "params": { + "source": wrappedjs + } + }, get_result) + + finally: + if manage_socket: + await self.close_websocket() + return res + + async def remove_script_to_evaluate_on_new_document(self, script_id: str, manage_socket: bool = True): + """ + Removes a script from a page that was added with `add_script_to_evaluate_on_new_document` + + Parameters + ---------- + script_id : int + The identifier of the script to remove (returned from `add_script_to_evaluate_on_new_document`) + """ + + try: + if manage_socket: + await self.open_websocket() + + await self._send_devtools_cmd({ + "method": "Page.removeScriptToEvaluateOnNewDocument", + "params": { + "identifier": script_id + } + }, False) + + finally: + if manage_socket: + await self.close_websocket() + + async def has_element(self, element_name: str, manage_socket: bool = True): + res = await self.evaluate_js(f"document.getElementById('{element_name}') != null", False, manage_socket) + assert res is not None + + if not "result" in res or not "result" in res["result"] or not "value" in res["result"]["result"]: + return False + + return res["result"]["result"]["value"] + + async def inject_css(self, style: str, manage_socket: bool = True): + try: + css_id = str(uuid.uuid4()) + + result = await self.evaluate_js( + f""" + (function() {{ + const style = document.createElement('style'); + style.id = "{css_id}"; + document.head.append(style); + style.textContent = `{style}`; + }})() + """, False, manage_socket) + + assert result is not None + + if "exceptionDetails" in result["result"]: + return { + "success": False, + "result": result["result"] + } + + return { + "success": True, + "result": css_id + } + except Exception as e: + return { + "success": False, + "result": e + } + + async def remove_css(self, css_id: str, manage_socket: bool = True): + try: + result = await self.evaluate_js( + f""" + (function() {{ + let style = document.getElementById("{css_id}"); + + if (style.nodeName.toLowerCase() == 'style') + style.parentNode.removeChild(style); + }})() + """, False, manage_socket) + + assert result is not None + + if "exceptionDetails" in result["result"]: + return { + "success": False, + "result": result + } + + return { + "success": True + } + except Exception as e: + return { + "success": False, + "result": e + } + + async def get_steam_resource(self, url: str): + res = await self.evaluate_js(f'(async function test() {{ return await (await fetch("{url}")).text() }})()', True) + assert res is not None + return res["result"]["result"]["value"] + + def __repr__(self): + return self.title + + +async def get_tabs() -> List[Tab]: + res = {} + + na = False + while True: + try: + async with ClientSession() as web: + res = await web.get(f"{BASE_ADDRESS}/json", timeout=3) + except ClientConnectorError: + if not na: + logger.debug("Steam isn't available yet. Wait for a moment...") + na = True + await sleep(5) + except ClientOSError: + logger.warn(f"The request to {BASE_ADDRESS}/json was reset") + await sleep(1) + except TimeoutError: + logger.warn(f"The request to {BASE_ADDRESS}/json timed out") + await sleep(1) + else: + break + + if res.status == 200: + r = await res.json() + return [Tab(i) for i in r] + else: + raise Exception(f"/json did not return 200. {await res.text()}") + + +async def get_tab(tab_name: str) -> Tab: + tabs = await get_tabs() + tab = next((i for i in tabs if i.title == tab_name), None) + if not tab: + raise ValueError(f"Tab {tab_name} not found") + return tab + +async def get_tab_lambda(test: Callable[[Tab], bool]) -> Tab: + tabs = await get_tabs() + tab = next((i for i in tabs if test(i)), None) + if not tab: + raise ValueError(f"Tab not found by lambda") + return tab + +SHARED_CTX_NAMES = ["SharedJSContext", "Steam Shared Context presented by Valveā¢", "Steam", "SP"] +CLOSEABLE_URLS = ["about:blank", "data:text/html,%3Cbody%3E%3C%2Fbody%3E"] # Closing anything other than these *really* likes to crash Steam +DO_NOT_CLOSE_URL = "Valve Steam Gamepad/default" # Steam Big Picture Mode tab + +def tab_is_gamepadui(t: Tab) -> bool: + return "https://steamloopback.host/routes/" in t.url and t.title in SHARED_CTX_NAMES + +async def get_gamepadui_tab() -> Tab: + tabs = await get_tabs() + tab = next((i for i in tabs if tab_is_gamepadui(i)), None) + if not tab: + raise ValueError(f"GamepadUI Tab not found") + return tab + +async def inject_to_tab(tab_name: str, js: str, run_async: bool = False): + tab = await get_tab(tab_name) + + return await tab.evaluate_js(js, run_async) + +async def close_old_tabs(): + tabs = await get_tabs() + for t in tabs: + if not t.title or (t.title not in SHARED_CTX_NAMES and any(url in t.url for url in CLOSEABLE_URLS) and DO_NOT_CLOSE_URL not in t.url): + logger.debug("Closing tab: " + getattr(t, "title", "Untitled")) + await t.close() + await sleep(0.5) |
