# Full imports import json # import pprint # from pprint import pformat # Partial imports from aiohttp import ClientSession from asyncio import sleep from hashlib import sha256 from io import BytesIO from logging import getLogger from os import R_OK, W_OK, path, listdir, access, mkdir from shutil import rmtree from subprocess import call from time import time from zipfile import ZipFile # Local modules from helpers import ( get_ssl_context, get_user, get_user_group, download_remote_binary_to_path, ) from injector import get_gamepadui_tab logger = getLogger("Browser") class PluginInstallContext: def __init__(self, artifact, name, version, hash) -> None: self.artifact = artifact self.name = name self.version = version self.hash = hash class PluginBrowser: def __init__(self, plugin_path, plugins, loader) -> None: self.plugin_path = plugin_path self.plugins = plugins self.loader = loader self.install_requests = {} def _unzip_to_plugin_dir(self, zip, name, hash): zip_hash = sha256(zip.getbuffer()).hexdigest() if hash and (zip_hash != hash): return False zip_file = ZipFile(zip) zip_file.extractall(self.plugin_path) plugin_dir = self.find_plugin_folder(name) code_chown = call( ["chown", "-R", get_user() + ":" + get_user_group(), plugin_dir] ) code_chmod = call(["chmod", "-R", "555", plugin_dir]) if code_chown != 0 or code_chmod != 0: logger.error( f"chown/chmod exited with a non-zero exit code (chown: {code_chown}," f" chmod: {code_chmod})" ) return False return True async def _download_remote_binaries_for_plugin_with_name(self, pluginBasePath): rv = False try: packageJsonPath = path.join(pluginBasePath, "package.json") pluginBinPath = path.join(pluginBasePath, "bin") if access(packageJsonPath, R_OK): with open(packageJsonPath, "r", encoding="utf-8") as f: packageJson = json.load(f) if ( "remote_binary" in packageJson and len(packageJson["remote_binary"]) > 0 ): # create bin directory if needed. call(["chmod", "-R", "777", pluginBasePath]) if access(pluginBasePath, W_OK): if not path.exists(pluginBinPath): mkdir(pluginBinPath) if not access(pluginBinPath, W_OK): call(["chmod", "-R", "777", pluginBinPath]) rv = True for remoteBinary in packageJson["remote_binary"]: # Required Fields. If any Remote Binary is missing these fail the install. binName = remoteBinary["name"] binURL = remoteBinary["url"] binHash = remoteBinary["sha256hash"] if not await download_remote_binary_to_path( binURL, binHash, path.join(pluginBinPath, binName) ): rv = False raise Exception( "Error Downloading Remote Binary" f" {binName}@{binURL} with hash {binHash} to" f" {path.join(pluginBinPath, binName)}" ) call( [ "chown", "-R", get_user() + ":" + get_user_group(), self.plugin_path, ] ) call(["chmod", "-R", "555", pluginBasePath]) else: rv = True logger.debug("No Remote Binaries to Download") except Exception as e: rv = False logger.debug(str(e)) return rv def find_plugin_folder(self, name): for folder in listdir(self.plugin_path): try: with open( path.join(self.plugin_path, folder, "plugin.json"), "r", encoding="utf-8", ) as f: plugin = json.load(f) if plugin["name"] == name: return str(path.join(self.plugin_path, folder)) except Exception: logger.debug(f"skipping {folder}") async def uninstall_plugin(self, name): if self.loader.watcher: self.loader.watcher.disabled = True tab = await get_gamepadui_tab() try: logger.info("uninstalling " + name) logger.info(" at dir " + self.find_plugin_folder(name)) logger.debug("calling frontend unload for %s" % str(name)) res = await tab.evaluate_js(f"DeckyPluginLoader.unloadPlugin('{name}')") logger.debug("result of unload from UI: %s", res) # plugins_snapshot = self.plugins.copy() # snapshot_string = pformat(plugins_snapshot) # logger.debug("current plugins: %s", snapshot_string) if self.plugins[name]: logger.debug("Plugin %s was found", name) self.plugins[name].stop() logger.debug("Plugin %s was stopped", name) del self.plugins[name] logger.debug("Plugin %s was removed from the dictionary", name) logger.debug("removing files %s" % str(name)) rmtree(self.find_plugin_folder(name)) except FileNotFoundError: logger.warning(f"Plugin {name} not installed, skipping uninstallation") except Exception as e: logger.error( f"Plugin {name} in {self.find_plugin_folder(name)} was not uninstalled" ) logger.error("Error at %s", exc_info=e) if self.loader.watcher: self.loader.watcher.disabled = False async def _install(self, artifact, name, version, hash): isInstalled = False if self.loader.watcher: self.loader.watcher.disabled = True try: pluginFolderPath = self.find_plugin_folder(name) if pluginFolderPath: isInstalled = True except Exception: logger.error( f"Failed to determine if {name} is already installed, continuing" " anyway." ) logger.info(f"Installing {name} (Version: {version})") async with ClientSession() as client: logger.debug(f"Fetching {artifact}") res = await client.get(artifact, ssl=get_ssl_context()) if res.status == 200: logger.debug("Got 200. Reading...") data = await res.read() logger.debug(f"Read {len(data)} bytes") res_zip = BytesIO(data) if isInstalled: try: logger.debug("Uninstalling existing plugin...") await self.uninstall_plugin(name) except Exception: logger.error(f"Plugin {name} could not be uninstalled.") logger.debug("Unzipping...") ret = self._unzip_to_plugin_dir(res_zip, name, hash) if ret: plugin_dir = self.find_plugin_folder(name) ret = await self._download_remote_binaries_for_plugin_with_name( plugin_dir ) if ret: logger.info(f"Installed {name} (Version: {version})") if name in self.loader.plugins: self.loader.plugins[name].stop() self.loader.plugins.pop(name, None) await sleep(1) self.loader.import_plugin( path.join(plugin_dir, "main.py"), plugin_dir ) else: logger.fatal("Failed Downloading Remote Binaries") else: self.log.fatal(f"SHA-256 Mismatch!!!! {name} (Version: {version})") if self.loader.watcher: self.loader.watcher.disabled = False else: logger.fatal(f"Could not fetch from URL. {await res.text()}") async def request_plugin_install(self, artifact, name, version, hash): request_id = str(time()) self.install_requests[request_id] = PluginInstallContext( artifact, name, version, hash ) tab = await get_gamepadui_tab() await tab.open_websocket() await tab.evaluate_js( f"DeckyPluginLoader.addPluginInstallPrompt('{name}', '{version}'," f" '{request_id}', '{hash}')" ) async def confirm_plugin_install(self, request_id): request = self.install_requests.pop(request_id) await self._install( request.artifact, request.name, request.version, request.hash ) def cancel_plugin_install(self, request_id): self.install_requests.pop(request_id)