summaryrefslogtreecommitdiff
path: root/backend/injector.py
diff options
context:
space:
mode:
Diffstat (limited to 'backend/injector.py')
-rw-r--r--backend/injector.py314
1 files changed, 185 insertions, 129 deletions
diff --git a/backend/injector.py b/backend/injector.py
index d77de13a..131dbc4a 100644
--- a/backend/injector.py
+++ b/backend/injector.py
@@ -2,10 +2,9 @@
from asyncio import sleep
from logging import getLogger
-from traceback import format_exc
from typing import List
-from aiohttp import ClientSession, WSMsgType
+from aiohttp import ClientSession
from aiohttp.client_exceptions import ClientConnectorError, ClientOSError
from asyncio.exceptions import TimeoutError
import uuid
@@ -39,9 +38,12 @@ class Tab:
async for message in self.websocket:
data = message.json()
yield data
- logger.warn(f"The Tab {self.title} socket has been disconnected while listening for messages.")
+ logger.warn(
+ f"The Tab {self.title} socket has been disconnected while listening for"
+ " messages."
+ )
await self.close_websocket()
-
+
async def _send_devtools_cmd(self, dc, receive=True):
if self.websocket:
self.cmd_id += 1
@@ -54,19 +56,24 @@ class Tab:
return None
raise RuntimeError("Websocket not opened")
- async def evaluate_js(self, js, run_async=False, manage_socket=True, get_result=True):
+ async def evaluate_js(
+ self, js, run_async=False, manage_socket=True, get_result=True
+ ):
try:
if manage_socket:
await self.open_websocket()
- res = await self._send_devtools_cmd({
- "method": "Runtime.evaluate",
- "params": {
- "expression": js,
- "userGesture": True,
- "awaitPromise": run_async
- }
- }, get_result)
+ res = await self._send_devtools_cmd(
+ {
+ "method": "Runtime.evaluate",
+ "params": {
+ "expression": js,
+ "userGesture": True,
+ "awaitPromise": run_async,
+ },
+ },
+ get_result,
+ )
finally:
if manage_socket:
@@ -74,9 +81,17 @@ class Tab:
return res
async def has_global_var(self, var_name, manage_socket=True):
- res = await self.evaluate_js(f"window['{var_name}'] !== null && window['{var_name}'] !== undefined", False, manage_socket)
-
- if not "result" in res or not "result" in res["result"] or not "value" in res["result"]["result"]:
+ res = await self.evaluate_js(
+ f"window['{var_name}'] !== null && window['{var_name}'] !== undefined",
+ False,
+ manage_socket,
+ )
+
+ if (
+ "result" not in res
+ or "result" not in res["result"]
+ or "value" not in res["result"]["result"]
+ ):
return False
return res["result"]["result"]["value"]
@@ -86,9 +101,12 @@ class Tab:
if manage_socket:
await self.open_websocket()
- res = await self._send_devtools_cmd({
- "method": "Page.close",
- }, False)
+ res = await self._send_devtools_cmd(
+ {
+ "method": "Page.close",
+ },
+ False,
+ )
finally:
if manage_socket:
@@ -99,32 +117,42 @@ class Tab:
"""
Enables page domain notifications.
"""
- await self._send_devtools_cmd({
- "method": "Page.enable",
- }, False)
+ await self._send_devtools_cmd(
+ {
+ "method": "Page.enable",
+ },
+ False,
+ )
async def disable(self):
"""
Disables page domain notifications.
"""
- await self._send_devtools_cmd({
- "method": "Page.disable",
- }, False)
-
- async def refresh(self):
+ await self._send_devtools_cmd(
+ {
+ "method": "Page.disable",
+ },
+ False,
+ )
+
+ async def refresh(self, manage_socket=False):
try:
if manage_socket:
await self.open_websocket()
- await self._send_devtools_cmd({
- "method": "Page.reload",
- }, False)
+ await self._send_devtools_cmd(
+ {
+ "method": "Page.reload",
+ },
+ False,
+ )
finally:
if manage_socket:
await self.close_websocket()
return
+
async def reload_and_evaluate(self, js, manage_socket=True):
"""
Reloads the current tab, with JS to run on load via debugger
@@ -133,64 +161,70 @@ class Tab:
if manage_socket:
await self.open_websocket()
- await self._send_devtools_cmd({
- "method": "Debugger.enable"
- }, True)
-
- await self._send_devtools_cmd({
- "method": "Runtime.evaluate",
- "params": {
- "expression": "location.reload();",
- "userGesture": True,
- "awaitPromise": False
- }
- }, False)
-
- breakpoint_res = await self._send_devtools_cmd({
- "method": "Debugger.setInstrumentationBreakpoint",
- "params": {
- "instrumentation": "beforeScriptExecution"
- }
- }, True)
+ await self._send_devtools_cmd({"method": "Debugger.enable"}, True)
+
+ await self._send_devtools_cmd(
+ {
+ "method": "Runtime.evaluate",
+ "params": {
+ "expression": "location.reload();",
+ "userGesture": True,
+ "awaitPromise": False,
+ },
+ },
+ False,
+ )
+
+ breakpoint_res = await self._send_devtools_cmd(
+ {
+ "method": "Debugger.setInstrumentationBreakpoint",
+ "params": {"instrumentation": "beforeScriptExecution"},
+ },
+ True,
+ )
logger.info(breakpoint_res)
-
+
# Page finishes loading when breakpoint hits
for x in range(20):
# this works around 1/5 of the time, so just send it 8 times.
# the js accounts for being injected multiple times allowing only one instance to run at a time anyway
- await self._send_devtools_cmd({
- "method": "Runtime.evaluate",
+ await self._send_devtools_cmd(
+ {
+ "method": "Runtime.evaluate",
+ "params": {
+ "expression": js,
+ "userGesture": True,
+ "awaitPromise": False,
+ },
+ },
+ False,
+ )
+
+ await self._send_devtools_cmd(
+ {
+ "method": "Debugger.removeBreakpoint",
"params": {
- "expression": js,
- "userGesture": True,
- "awaitPromise": False
- }
- }, False)
-
- await self._send_devtools_cmd({
- "method": "Debugger.removeBreakpoint",
- "params": {
- "breakpointId": breakpoint_res["result"]["breakpointId"]
- }
- }, False)
+ "breakpointId": breakpoint_res["result"]["breakpointId"]
+ },
+ },
+ False,
+ )
for x in range(4):
- await self._send_devtools_cmd({
- "method": "Debugger.resume"
- }, False)
+ await self._send_devtools_cmd({"method": "Debugger.resume"}, False)
- await self._send_devtools_cmd({
- "method": "Debugger.disable"
- }, True)
+ await self._send_devtools_cmd({"method": "Debugger.disable"}, True)
finally:
if manage_socket:
await self.close_websocket()
return
- async def add_script_to_evaluate_on_new_document(self, js, add_dom_wrapper=True, manage_socket=True, get_result=True):
+ async def add_script_to_evaluate_on_new_document(
+ self, js, add_dom_wrapper=True, manage_socket=True, get_result=True
+ ):
"""
How the underlying call functions is not particularly clear from the devtools docs, so stealing puppeteer's description:
@@ -225,35 +259,44 @@ class Tab:
"""
try:
- wrappedjs = """
- function scriptFunc() {
+ wrappedjs = (
+ """
+ function scriptFunc() {{
{js}
- }
- if (document.readyState === 'loading') {
- addEventListener('DOMContentLoaded', () => {
+ }}
+ if (document.readyState === 'loading') {{
+ addEventListener('DOMContentLoaded', () => {{
scriptFunc();
- });
- } else {
+ }});
+ }} else {{
scriptFunc();
- }
- """.format(js=js) if add_dom_wrapper else js
+ }}
+ """.format(
+ js=js
+ )
+ if add_dom_wrapper
+ else js
+ )
if manage_socket:
await self.open_websocket()
- res = await self._send_devtools_cmd({
- "method": "Page.addScriptToEvaluateOnNewDocument",
- "params": {
- "source": wrappedjs
- }
- }, get_result)
+ res = await self._send_devtools_cmd(
+ {
+ "method": "Page.addScriptToEvaluateOnNewDocument",
+ "params": {"source": wrappedjs},
+ },
+ get_result,
+ )
finally:
if manage_socket:
await self.close_websocket()
return res
- async def remove_script_to_evaluate_on_new_document(self, script_id, manage_socket=True):
+ async def remove_script_to_evaluate_on_new_document(
+ self, script_id, manage_socket=True
+ ):
"""
Removes a script from a page that was added with `add_script_to_evaluate_on_new_document`
@@ -267,21 +310,28 @@ class Tab:
if manage_socket:
await self.open_websocket()
- res = await self._send_devtools_cmd({
- "method": "Page.removeScriptToEvaluateOnNewDocument",
- "params": {
- "identifier": script_id
- }
- }, False)
+ await self._send_devtools_cmd(
+ {
+ "method": "Page.removeScriptToEvaluateOnNewDocument",
+ "params": {"identifier": script_id},
+ },
+ False,
+ )
finally:
if manage_socket:
await self.close_websocket()
async def has_element(self, element_name, manage_socket=True):
- res = await self.evaluate_js(f"document.getElementById('{element_name}') != null", False, manage_socket)
-
- if not "result" in res or not "result" in res["result"] or not "value" in res["result"]["result"]:
+ res = await self.evaluate_js(
+ f"document.getElementById('{element_name}') != null", False, manage_socket
+ )
+
+ if (
+ "result" not in res
+ or "result" not in res["result"]
+ or "value" not in res["result"]["result"]
+ ):
return False
return res["result"]["result"]["value"]
@@ -298,23 +348,17 @@ class Tab:
document.head.append(style);
style.textContent = `{style}`;
}})()
- """, False, manage_socket)
+ """,
+ False,
+ manage_socket,
+ )
if "exceptionDetails" in result["result"]:
- return {
- "success": False,
- "result": result["result"]
- }
-
- return {
- "success": True,
- "result": css_id
- }
+ return {"success": False, "result": result["result"]}
+
+ return {"success": True, "result": css_id}
except Exception as e:
- return {
- "success": False,
- "result": e
- }
+ return {"success": False, "result": e}
async def remove_css(self, css_id, manage_socket=True):
try:
@@ -326,25 +370,24 @@ class Tab:
if (style.nodeName.toLowerCase() == 'style')
style.parentNode.removeChild(style);
}})()
- """, False, manage_socket)
+ """,
+ False,
+ manage_socket,
+ )
if "exceptionDetails" in result["result"]:
- return {
- "success": False,
- "result": result
- }
-
- return {
- "success": True
- }
+ return {"success": False, "result": result}
+
+ return {"success": True}
except Exception as e:
- return {
- "success": False,
- "result": e
- }
+ return {"success": False, "result": e}
async def get_steam_resource(self, url):
- res = await self.evaluate_js(f'(async function test() {{ return await (await fetch("{url}")).text() }})()', True)
+ res = await self.evaluate_js(
+ f'(async function test() {{ return await (await fetch("{url}")).text()'
+ " })()",
+ True,
+ )
return res["result"]["result"]["value"]
def __repr__(self):
@@ -387,32 +430,45 @@ async def get_tab(tab_name) -> Tab:
raise ValueError(f"Tab {tab_name} not found")
return tab
+
async def get_tab_lambda(test) -> Tab:
tabs = await get_tabs()
tab = next((i for i in tabs if test(i)), None)
if not tab:
- raise ValueError(f"Tab not found by lambda")
+ raise ValueError("Tab not found by lambda")
return tab
+
def tab_is_gamepadui(t: Tab) -> bool:
- return "https://steamloopback.host/routes/" in t.url and (t.title == "Steam Shared Context presented by Valve™" or t.title == "Steam" or t.title == "SP")
+ return "https://steamloopback.host/routes/" in t.url and (
+ t.title == "Steam Shared Context presented by Valve™"
+ or t.title == "Steam"
+ or t.title == "SP"
+ )
+
async def get_gamepadui_tab() -> Tab:
tabs = await get_tabs()
tab = next((i for i in tabs if tab_is_gamepadui(i)), None)
if not tab:
- raise ValueError(f"GamepadUI Tab not found")
+ raise ValueError("GamepadUI Tab not found")
return tab
+
async def inject_to_tab(tab_name, js, run_async=False):
tab = await get_tab(tab_name)
return await tab.evaluate_js(js, run_async)
+
async def close_old_tabs():
tabs = await get_tabs()
for t in tabs:
- if not t.title or (t.title != "Steam Shared Context presented by Valve™" and t.title != "Steam" and t.title != "SP"):
+ if not t.title or (
+ t.title != "Steam Shared Context presented by Valve™"
+ and t.title != "Steam"
+ and t.title != "SP"
+ ):
logger.debug("Closing tab: " + getattr(t, "title", "Untitled"))
await t.close()
- await sleep(0.5) \ No newline at end of file
+ await sleep(0.5)