diff options
| author | K900 <me@0upti.me> | 2023-11-14 00:40:37 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2023-11-13 23:40:37 +0200 |
| commit | 5a633fdd8284dd1a2b6f3c95806f033ef4a4becf (patch) | |
| tree | b89f3660d3b8918484e6bc153003a84b95207045 /backend/src/injector.py | |
| parent | 8ce4a7679e9f0abb67e85822fefe08237ec9d82e (diff) | |
| download | decky-loader-5a633fdd8284dd1a2b6f3c95806f033ef4a4becf.tar.gz decky-loader-5a633fdd8284dd1a2b6f3c95806f033ef4a4becf.zip | |
Packaging rework (#531)marios8543/async-plugin-method-requests
* fix: get rid of title view jank on latest beta
* Count the number of installs for each plugin (#557)
* Bump aiohttp from 3.8.4 to 3.8.5 in /backend (#558)
* fix: include Decky version in request for index.js
This avoids the If-Modified-Since logic in aiohttp and ensures Steam doesn't cache old JS,
even if the timestamps are normalized.
* fix: clean up shellcheck warnings in act runner script
* fix: gitignore settings/
* fix: ensure state directories exist when running without the installer
* feat: determine root directory correctly when running from in-tree
* fix: fix typo in CI script
* refactor: build a proper Python package with poetry
* refactor: move decky_plugin under the poetry structure
There's no need to special case it anymore, just treat it like any other Python module.
* sandboxed_plugin: better fix, attempt 2
---------
Co-authored-by: AAGaming <aagaming@riseup.net>
Co-authored-by: Party Wumpus <48649272+PartyWumpus@users.noreply.github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Diffstat (limited to 'backend/src/injector.py')
| -rw-r--r-- | backend/src/injector.py | 438 |
1 files changed, 0 insertions, 438 deletions
diff --git a/backend/src/injector.py b/backend/src/injector.py deleted file mode 100644 index a217f689..00000000 --- a/backend/src/injector.py +++ /dev/null @@ -1,438 +0,0 @@ -# Injector code from https://github.com/SteamDeckHomebrew/steamdeck-ui-inject. More info on how it works there. - -from asyncio import sleep -from logging import getLogger -from typing import Any, Callable, List, TypedDict, Dict - -from aiohttp import ClientSession -from aiohttp.client_exceptions import ClientConnectorError, ClientOSError -from asyncio.exceptions import TimeoutError -import uuid - -BASE_ADDRESS = "http://localhost:8080" - -logger = getLogger("Injector") - -class _TabResponse(TypedDict): - title: str - id: str - url: str - webSocketDebuggerUrl: str - -class Tab: - cmd_id = 0 - - def __init__(self, res: _TabResponse) -> None: - self.title: str = res["title"] - self.id: str = res["id"] - self.url: str = res["url"] - self.ws_url: str = res["webSocketDebuggerUrl"] - - self.websocket = None - self.client = None - - async def open_websocket(self): - self.client = ClientSession() - self.websocket = await self.client.ws_connect(self.ws_url) # type: ignore - - async def close_websocket(self): - if self.websocket: - await self.websocket.close() - if self.client: - await self.client.close() - - async def listen_for_message(self): - if self.websocket: - async for message in self.websocket: - data = message.json() - yield data - logger.warn(f"The Tab {self.title} socket has been disconnected while listening for messages.") - await self.close_websocket() - - async def _send_devtools_cmd(self, dc: Dict[str, Any], receive: bool = True): - if self.websocket: - self.cmd_id += 1 - dc["id"] = self.cmd_id - await self.websocket.send_json(dc) - if receive: - async for msg in self.listen_for_message(): - if "id" in msg and msg["id"] == dc["id"]: - return msg - return None - raise RuntimeError("Websocket not opened") - - async def evaluate_js(self, js: str, run_async: bool | None = False, manage_socket: bool | None = True, get_result: bool = True): - try: - if manage_socket: - await self.open_websocket() - - res = await self._send_devtools_cmd({ - "method": "Runtime.evaluate", - "params": { - "expression": js, - "userGesture": True, - "awaitPromise": run_async - } - }, get_result) - - finally: - if manage_socket: - await self.close_websocket() - return res - - async def has_global_var(self, var_name: str, manage_socket: bool = True): - res = await self.evaluate_js(f"window['{var_name}'] !== null && window['{var_name}'] !== undefined", False, manage_socket) - assert res is not None - - if not "result" in res or not "result" in res["result"] or not "value" in res["result"]["result"]: - return False - - return res["result"]["result"]["value"] - - async def close(self, manage_socket: bool = True): - try: - if manage_socket: - await self.open_websocket() - - res = await self._send_devtools_cmd({ - "method": "Page.close", - }, False) - - finally: - if manage_socket: - await self.close_websocket() - return res - - async def enable(self): - """ - Enables page domain notifications. - """ - await self._send_devtools_cmd({ - "method": "Page.enable", - }, False) - - async def disable(self): - """ - Disables page domain notifications. - """ - await self._send_devtools_cmd({ - "method": "Page.disable", - }, False) - - async def refresh(self, manage_socket: bool = True): - try: - if manage_socket: - await self.open_websocket() - - await self._send_devtools_cmd({ - "method": "Page.reload", - }, False) - - finally: - if manage_socket: - await self.close_websocket() - - return - async def reload_and_evaluate(self, js: str, manage_socket: bool = True): - """ - Reloads the current tab, with JS to run on load via debugger - """ - try: - if manage_socket: - await self.open_websocket() - - await self._send_devtools_cmd({ - "method": "Debugger.enable" - }, True) - - await self._send_devtools_cmd({ - "method": "Runtime.evaluate", - "params": { - "expression": "location.reload();", - "userGesture": True, - "awaitPromise": False - } - }, False) - - breakpoint_res = await self._send_devtools_cmd({ - "method": "Debugger.setInstrumentationBreakpoint", - "params": { - "instrumentation": "beforeScriptExecution" - } - }, True) - - assert breakpoint_res is not None - - logger.info(breakpoint_res) - - # Page finishes loading when breakpoint hits - - for _ in range(20): - # this works around 1/5 of the time, so just send it 8 times. - # the js accounts for being injected multiple times allowing only one instance to run at a time anyway - await self._send_devtools_cmd({ - "method": "Runtime.evaluate", - "params": { - "expression": js, - "userGesture": True, - "awaitPromise": False - } - }, False) - - await self._send_devtools_cmd({ - "method": "Debugger.removeBreakpoint", - "params": { - "breakpointId": breakpoint_res["result"]["breakpointId"] - } - }, False) - - for _ in range(4): - await self._send_devtools_cmd({ - "method": "Debugger.resume" - }, False) - - await self._send_devtools_cmd({ - "method": "Debugger.disable" - }, True) - - finally: - if manage_socket: - await self.close_websocket() - return - - async def add_script_to_evaluate_on_new_document(self, js: str, add_dom_wrapper: bool = True, manage_socket: bool = True, get_result: bool = True): - """ - How the underlying call functions is not particularly clear from the devtools docs, so stealing puppeteer's description: - - Adds a function which would be invoked in one of the following scenarios: - * whenever the page is navigated - * whenever the child frame is attached or navigated. In this case, the - function is invoked in the context of the newly attached frame. - - The function is invoked after the document was created but before any of - its scripts were run. This is useful to amend the JavaScript environment, - e.g. to seed `Math.random`. - - Parameters - ---------- - js : str - The script to evaluate on new document - add_dom_wrapper : bool - True to wrap the script in a wait for the 'DOMContentLoaded' event. - DOM will usually not exist when this execution happens, - so it is necessary to delay til DOM is loaded if you are modifying it - manage_socket : bool - True to have this function handle opening/closing the websocket for this tab - get_result : bool - True to wait for the result of this call - - Returns - ------- - int or None - The identifier of the script added, used to remove it later. - (see remove_script_to_evaluate_on_new_document below) - None is returned if `get_result` is False - """ - try: - - wrappedjs = """ - function scriptFunc() { - {js} - } - if (document.readyState === 'loading') { - addEventListener('DOMContentLoaded', () => { - scriptFunc(); - }); - } else { - scriptFunc(); - } - """.format(js=js) if add_dom_wrapper else js - - if manage_socket: - await self.open_websocket() - - res = await self._send_devtools_cmd({ - "method": "Page.addScriptToEvaluateOnNewDocument", - "params": { - "source": wrappedjs - } - }, get_result) - - finally: - if manage_socket: - await self.close_websocket() - return res - - async def remove_script_to_evaluate_on_new_document(self, script_id: str, manage_socket: bool = True): - """ - Removes a script from a page that was added with `add_script_to_evaluate_on_new_document` - - Parameters - ---------- - script_id : int - The identifier of the script to remove (returned from `add_script_to_evaluate_on_new_document`) - """ - - try: - if manage_socket: - await self.open_websocket() - - await self._send_devtools_cmd({ - "method": "Page.removeScriptToEvaluateOnNewDocument", - "params": { - "identifier": script_id - } - }, False) - - finally: - if manage_socket: - await self.close_websocket() - - async def has_element(self, element_name: str, manage_socket: bool = True): - res = await self.evaluate_js(f"document.getElementById('{element_name}') != null", False, manage_socket) - assert res is not None - - if not "result" in res or not "result" in res["result"] or not "value" in res["result"]["result"]: - return False - - return res["result"]["result"]["value"] - - async def inject_css(self, style: str, manage_socket: bool = True): - try: - css_id = str(uuid.uuid4()) - - result = await self.evaluate_js( - f""" - (function() {{ - const style = document.createElement('style'); - style.id = "{css_id}"; - document.head.append(style); - style.textContent = `{style}`; - }})() - """, False, manage_socket) - - assert result is not None - - if "exceptionDetails" in result["result"]: - return { - "success": False, - "result": result["result"] - } - - return { - "success": True, - "result": css_id - } - except Exception as e: - return { - "success": False, - "result": e - } - - async def remove_css(self, css_id: str, manage_socket: bool = True): - try: - result = await self.evaluate_js( - f""" - (function() {{ - let style = document.getElementById("{css_id}"); - - if (style.nodeName.toLowerCase() == 'style') - style.parentNode.removeChild(style); - }})() - """, False, manage_socket) - - assert result is not None - - if "exceptionDetails" in result["result"]: - return { - "success": False, - "result": result - } - - return { - "success": True - } - except Exception as e: - return { - "success": False, - "result": e - } - - async def get_steam_resource(self, url: str): - res = await self.evaluate_js(f'(async function test() {{ return await (await fetch("{url}")).text() }})()', True) - assert res is not None - return res["result"]["result"]["value"] - - def __repr__(self): - return self.title - - -async def get_tabs() -> List[Tab]: - res = {} - - na = False - while True: - try: - async with ClientSession() as web: - res = await web.get(f"{BASE_ADDRESS}/json", timeout=3) - except ClientConnectorError: - if not na: - logger.debug("Steam isn't available yet. Wait for a moment...") - na = True - await sleep(5) - except ClientOSError: - logger.warn(f"The request to {BASE_ADDRESS}/json was reset") - await sleep(1) - except TimeoutError: - logger.warn(f"The request to {BASE_ADDRESS}/json timed out") - await sleep(1) - else: - break - - if res.status == 200: - r = await res.json() - return [Tab(i) for i in r] - else: - raise Exception(f"/json did not return 200. {await res.text()}") - - -async def get_tab(tab_name: str) -> Tab: - tabs = await get_tabs() - tab = next((i for i in tabs if i.title == tab_name), None) - if not tab: - raise ValueError(f"Tab {tab_name} not found") - return tab - -async def get_tab_lambda(test: Callable[[Tab], bool]) -> Tab: - tabs = await get_tabs() - tab = next((i for i in tabs if test(i)), None) - if not tab: - raise ValueError(f"Tab not found by lambda") - return tab - -SHARED_CTX_NAMES = ["SharedJSContext", "Steam Shared Context presented by Valveā¢", "Steam", "SP"] -CLOSEABLE_URLS = ["about:blank", "data:text/html,%3Cbody%3E%3C%2Fbody%3E"] # Closing anything other than these *really* likes to crash Steam -DO_NOT_CLOSE_URL = "Valve Steam Gamepad/default" # Steam Big Picture Mode tab - -def tab_is_gamepadui(t: Tab) -> bool: - return "https://steamloopback.host/routes/" in t.url and t.title in SHARED_CTX_NAMES - -async def get_gamepadui_tab() -> Tab: - tabs = await get_tabs() - tab = next((i for i in tabs if tab_is_gamepadui(i)), None) - if not tab: - raise ValueError(f"GamepadUI Tab not found") - return tab - -async def inject_to_tab(tab_name: str, js: str, run_async: bool = False): - tab = await get_tab(tab_name) - - return await tab.evaluate_js(js, run_async) - -async def close_old_tabs(): - tabs = await get_tabs() - for t in tabs: - if not t.title or (t.title not in SHARED_CTX_NAMES and any(url in t.url for url in CLOSEABLE_URLS) and DO_NOT_CLOSE_URL not in t.url): - logger.debug("Closing tab: " + getattr(t, "title", "Untitled")) - await t.close() - await sleep(0.5) |
