summaryrefslogtreecommitdiff
path: root/plugin_loader/main.py
blob: 48f45231e51fb5ca9ec72b88f15874c75d7d38de (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
from logging import getLogger, basicConfig, INFO, DEBUG, Filter, root
from os import getenv

CONFIG = {
    "plugin_path": getenv("PLUGIN_PATH", "/home/deck/homebrew/plugins"),
    "server_host": getenv("SERVER_HOST", "127.0.0.1"),
    "server_port": int(getenv("SERVER_PORT", "1337")),
    "live_reload": getenv("LIVE_RELOAD", "1") == "1",
    "log_level": {"CRITICAL": 50, "ERROR": 40, "WARNING":30, "INFO": 20, "DEBUG": 10}[getenv("LOG_LEVEL", "INFO")],
    "store_url": getenv("STORE_URL", "https://plugins.deckbrew.xyz"),
    "log_base_events": getenv("LOG_BASE_EVENTS", "0")=="1"
}

class NoBaseEvents(Filter):
    def filter(self, record):
        return not "asyncio" in record.name

basicConfig(level=CONFIG["log_level"], format="[%(module)s][%(levelname)s]: %(message)s")
for handler in root.handlers:
    if not CONFIG["log_base_events"]:
        handler.addFilter(NoBaseEvents())

from aiohttp.web import Application, run_app, static
from aiohttp_jinja2 import setup as jinja_setup
from jinja2 import FileSystemLoader
from os import path
from asyncio import get_event_loop, sleep
from json import loads, dumps
from subprocess import Popen

from loader import Loader
from injector import inject_to_tab, get_tab, tab_has_element
from utilities import Utilities
from browser import PluginBrowser

logger = getLogger("Main")
from traceback import print_exc

async def chown_plugin_dir(_):
    Popen(["chown", "-R", "deck:deck", CONFIG["plugin_path"]])
    Popen(["chmod", "-R", "555", CONFIG["plugin_path"]])

class PluginManager:
    def __init__(self) -> None:
        self.loop = get_event_loop()
        self.web_app = Application()
        self.plugin_loader = Loader(self.web_app, CONFIG["plugin_path"], self.loop, CONFIG["live_reload"])
        self.plugin_browser = PluginBrowser(CONFIG["plugin_path"], self.web_app, CONFIG["store_url"])
        self.utilities = Utilities(self)

        jinja_setup(self.web_app, loader=FileSystemLoader(path.join(path.dirname(__file__), 'templates')))
        self.web_app.on_startup.append(self.inject_javascript)
        self.web_app.on_startup.append(chown_plugin_dir)
        self.web_app.add_routes([static("/static", path.join(path.dirname(__file__), 'static'))])
        self.loop.create_task(self.method_call_listener())
        self.loop.create_task(self.loader_reinjector())

        self.loop.set_exception_handler(self.exception_handler)

    def exception_handler(self, loop, context):
        if context["message"] == "Unclosed connection":
            return
        loop.default_exception_handler(context)
                
    async def loader_reinjector(self):
        finished_reinjection = False
        logger.info("Plugin loader isn't present in Steam anymore, reinjecting...")
        while True:
            await sleep(1)
            if not await tab_has_element("QuickAccess", "plugin_iframe"):
                logger.debug("Plugin loader isn't present in Steam anymore, reinjecting...")
                await self.inject_javascript()
                finished_reinjection = True
            elif finished_reinjection:
                finished_reinjection = False
                logger.info("Reinjecting successful!")

                self.loop.create_task(self.method_call_listener())

    async def inject_javascript(self, request=None):
        try:
            await inject_to_tab("QuickAccess", open(path.join(path.dirname(__file__), "static/library.js"), "r").read())
            await inject_to_tab("QuickAccess", open(path.join(path.dirname(__file__), "static/plugin_page.js"), "r").read())
        except:
            logger.info("Failed to inject JavaScript into tab")
            pass

    async def resolve_method_call(self, tab, call_id, response):
        try:
            r = dumps(response)
        except Exception as e:
            logger.error(response["result"])
            response["result"] = str(response["result"])
            r = response
        await tab._send_devtools_cmd({
            "id": 1,
            "method": "Runtime.evaluate",
            "params": {
                "expression": f"resolveMethodCall('{call_id}', {r})",
                "userGesture": True
            }
        }, receive=False)

    async def handle_method_call(self, method, tab):
        res = {}
        try:
            if method["method"] == "plugin_method":
                res["result"] = await self.plugin_loader.handle_plugin_method_call(
                    method["args"]["plugin_name"],
                    method["args"]["method_name"],
                    **method["args"]["args"]
                )
                res["success"] = True
            else:
                r = await self.utilities.util_methods[method["method"]](**method["args"])
                res["result"] = r
                res["success"] = True
        except Exception as e:
            res["result"] = str(e)
            res["success"] = False
        finally:
            await self.resolve_method_call(tab, method["id"], res)

    async def method_call_listener(self):
        while True:
            try:
                tab = await get_tab("QuickAccess")
                break
            except:
                await sleep(1)
        await tab.open_websocket()
        await tab._send_devtools_cmd({"id": 1, "method": "Runtime.discardConsoleEntries"})
        await tab._send_devtools_cmd({"id": 1, "method": "Runtime.enable"})
        async for message in tab.listen_for_message():
            data = message.json()
            if not "id" in data and data["method"] == "Runtime.consoleAPICalled" and data["params"]["type"] == "debug":
                method = loads(data["params"]["args"][0]["value"])
                self.loop.create_task(self.handle_method_call(method, tab))

    def run(self):
        return run_app(self.web_app, host=CONFIG["server_host"], port=CONFIG["server_port"], loop=self.loop, access_log=None)

if __name__ == "__main__":
    PluginManager().run()