summaryrefslogtreecommitdiff
path: root/plugin_loader/injector.py
blob: edb095275b5a16b24aad6cc8d980535810d6b4eb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#Injector code from https://github.com/SteamDeckHomebrew/steamdeck-ui-inject. More info on how it works there.

from aiohttp import ClientSession
from logging import debug, getLogger
from asyncio import sleep

BASE_ADDRESS = "http://localhost:8080"

logger = getLogger("Injector")

class Tab:
    def __init__(self, res) -> None:
        self.title = res["title"]
        self.id = res["id"]
        self.ws_url = res["webSocketDebuggerUrl"]

        self.websocket = None
        self.client = None

    async def open_websocket(self):
        self.client = ClientSession()
        self.websocket = await self.client.ws_connect(self.ws_url)
    
    async def listen_for_message(self):
        async for message in self.websocket:
            yield message

    async def _send_devtools_cmd(self, dc, receive=True):
        if self.websocket:
            await self.websocket.send_json(dc)
            return (await self.websocket.receive_json()) if receive else None
        raise RuntimeError("Websocket not opened")

    async def evaluate_js(self, js, run_async):
        await self.open_websocket()
        res = await self._send_devtools_cmd({
            "id": 1,
            "method": "Runtime.evaluate",
            "params": {
                "expression": js,
                "userGesture": True,
                "awaitPromise": run_async
            }
        })
        await self.client.close()
        return res
        
    async def get_steam_resource(self, url):
        res = await self.evaluate_js(f'(async function test() {{ return await (await fetch("{url}")).text() }})()', True)
        return res["result"]["result"]["value"]
    
    def __repr__(self):
        return self.title

async def get_tabs():
    async with ClientSession() as web:
        res = {}

        while True:
            try:
                res = await web.get("{}/json".format(BASE_ADDRESS))
                break
            except:
                logger.info("Steam isn't available yet. Wait for a moment...")
                await sleep(5)

        if res.status == 200:
            res = await res.json()
            return [Tab(i) for i in res]
        else:
            raise Exception("/json did not return 200. {}".format(await res.text()))

async def get_tab(tab_name):
    tabs = await get_tabs()
    tab = next((i for i in tabs if i.title == tab_name), None)
    if not tab:
        raise ValueError("Tab {} not found".format(tab_name))
    return tab    

async def inject_to_tab(tab_name, js, run_async=False):
    tab = await get_tab(tab_name)

    return await tab.evaluate_js(js, run_async)

async def tab_has_element(tab_name, element_name):
    try:
        tab = await get_tab(tab_name)
    except ValueError:
        return False
    res = await tab.evaluate_js(f"document.getElementById('{element_name}') != null", False)
    
    if not "result" in res or not "result" in res["result"] or not "value" in res["result"]["result"]:
        return False

    return res["result"]["result"]["value"]