diff options
Diffstat (limited to 'main.py')
| -rw-r--r-- | main.py | 339 |
1 files changed, 309 insertions, 30 deletions
@@ -3,6 +3,7 @@ import os import re import shutil import subprocess +from configparser import ConfigParser from pathlib import Path # Toggle to enable overwriting the upscaler DLL from the static remote binary. @@ -62,6 +63,17 @@ LEGACY_FILES = [ "OptiScaler.log", ] +QUICK_SETTING_DEFINITIONS = [ + {"id": "Dx12Upscaler", "section": None, "key": "Dx12Upscaler", "default": "auto"}, + {"id": "FrameGen.Enabled", "section": "FrameGen", "key": "Enabled", "default": "auto"}, + {"id": "FGInput", "section": None, "key": "FGInput", "default": "auto"}, + {"id": "FGOutput", "section": None, "key": "FGOutput", "default": "auto"}, + {"id": "Fsr4ForceCapable", "section": None, "key": "Fsr4ForceCapable", "default": "false"}, + {"id": "Fsr4EnableWatermark", "section": None, "key": "Fsr4EnableWatermark", "default": "false"}, + {"id": "UseHQFont", "section": None, "key": "UseHQFont", "default": "false"}, + {"id": "Menu.Scale", "section": "Menu", "key": "Scale", "default": "1.000000"}, +] + class Plugin: async def _main(self): @@ -152,6 +164,202 @@ class Plugin: return data + def _write_manifest_env(self, manifest_path: Path, updates: dict) -> dict: + data = self._parse_manifest_env(manifest_path) + for key, value in updates.items(): + if value is None: + continue + data[str(key)] = str(value) + + manifest_path.parent.mkdir(parents=True, exist_ok=True) + with open(manifest_path, "w", encoding="utf-8") as manifest: + for key in sorted(data.keys()): + manifest.write(f'{key}="{data[key]}"\n') + return data + + def _normalize_proxy(self, proxy: str | None) -> str: + normalized = (proxy or "winmm").replace(".dll", "").strip().lower() + if normalized not in SUPPORTED_PROXIES: + raise ValueError(f"Unsupported proxy '{proxy}'") + return normalized + + def _find_installed_game(self, appid: str | None = None) -> list[dict]: + games = [] + for library_path in self._steam_library_paths(): + steamapps_path = library_path / "steamapps" + if not steamapps_path.exists(): + continue + + for appmanifest in steamapps_path.glob("appmanifest_*.acf"): + game_info = {"appid": "", "name": "", "library_path": str(library_path)} + try: + with open(appmanifest, "r", encoding="utf-8", errors="replace") as file: + for line in file: + if '"appid"' in line: + game_info["appid"] = line.split('"appid"', 1)[1].strip().strip('"') + if '"name"' in line: + game_info["name"] = line.split('"name"', 1)[1].strip().strip('"') + except Exception as exc: + decky.logger.error(f"Skipping {appmanifest}: {exc}") + + if not game_info["appid"] or not game_info["name"]: + continue + if "Proton" in game_info["name"] or "Steam Linux Runtime" in game_info["name"]: + continue + if appid is None or str(game_info["appid"]) == str(appid): + games.append(game_info) + + deduped = {} + for game in games: + deduped[str(game["appid"])] = game + return list(deduped.values()) + + def _compatdata_dir_for_appid(self, appid: str, create: bool = False) -> Path | None: + compatdata_dirs = self._compatdata_dirs_for_appid(str(appid)) + if compatdata_dirs: + return compatdata_dirs[0] + + if not create: + return None + + installed_games = self._find_installed_game(appid) + if not installed_games: + return None + + library_path = Path(installed_games[0]["library_path"]) + compatdata_dir = library_path / "steamapps" / "compatdata" / str(appid) + compatdata_dir.mkdir(parents=True, exist_ok=True) + return compatdata_dir + + def _managed_paths_for_appid(self, appid: str, create: bool = False) -> dict | None: + compatdata_dir = self._compatdata_dir_for_appid(str(appid), create=create) + if not compatdata_dir: + return None + + managed_root = compatdata_dir / MANAGED_DIRNAME + system32 = compatdata_dir / "pfx" / "drive_c" / "windows" / "system32" + if create: + managed_root.mkdir(parents=True, exist_ok=True) + + return { + "compatdata_dir": compatdata_dir, + "managed_root": managed_root, + "manifest_path": managed_root / MANIFEST_FILENAME, + "managed_ini": managed_root / "OptiScaler.ini", + "system32": system32, + "live_ini": system32 / "OptiScaler.ini", + } + + def _ensure_managed_ini(self, appid: str) -> dict: + paths = self._managed_paths_for_appid(appid, create=True) + if not paths: + raise FileNotFoundError(f"Unable to resolve compatdata path for app {appid}") + + managed_ini = paths["managed_ini"] + if managed_ini.exists(): + return paths + + bundle_ini = self._bundle_path() / "OptiScaler.ini" + if not bundle_ini.exists(): + raise FileNotFoundError("OptiScaler runtime is not installed; missing bundled OptiScaler.ini") + + shutil.copy2(bundle_ini, managed_ini) + self._modify_optiscaler_ini(managed_ini) + return paths + + def _read_ini_config(self, ini_path: Path) -> ConfigParser: + config = ConfigParser(interpolation=None) + config.optionxform = str + if ini_path.exists(): + config.read(ini_path, encoding="utf-8") + return config + + def _find_sections_for_key(self, config: ConfigParser, key: str) -> list[str]: + return [section for section in config.sections() if config.has_option(section, key)] + + def _resolve_setting_definition(self, setting_id: str) -> dict | None: + for definition in QUICK_SETTING_DEFINITIONS: + if definition["id"] == setting_id: + return definition + return None + + def _resolve_setting_section(self, config: ConfigParser, definition: dict) -> str | None: + hinted_section = definition.get("section") + key = definition["key"] + + if hinted_section and config.has_option(hinted_section, key): + return hinted_section + + matching_sections = self._find_sections_for_key(config, key) + if len(matching_sections) == 1: + return matching_sections[0] + if hinted_section: + return hinted_section + if matching_sections: + return matching_sections[0] + return None + + def _extract_quick_settings(self, ini_path: Path) -> dict: + config = self._read_ini_config(ini_path) + settings = {} + for definition in QUICK_SETTING_DEFINITIONS: + resolved_section = self._resolve_setting_section(config, definition) + value = definition["default"] + if resolved_section and config.has_option(resolved_section, definition["key"]): + value = config.get(resolved_section, definition["key"]) + settings[definition["id"]] = value + return settings + + def _replace_ini_values(self, ini_path: Path, resolved_updates: list[tuple[str, str, str]]) -> None: + lines = [] + if ini_path.exists(): + with open(ini_path, "r", encoding="utf-8", errors="replace") as ini_file: + lines = ini_file.readlines() + + for section, key, value in resolved_updates: + section_pattern = re.compile(rf"^\s*\[{re.escape(section)}]\s*$") + any_section_pattern = re.compile(r"^\s*\[.*]\s*$") + key_pattern = re.compile(rf"^\s*{re.escape(key)}\s*=.*$") + + in_target_section = False + section_found = False + inserted_or_updated = False + insert_index = len(lines) + + for index, line in enumerate(lines): + if section_pattern.match(line): + in_target_section = True + section_found = True + insert_index = index + 1 + continue + + if in_target_section and any_section_pattern.match(line): + insert_index = index + break + + if in_target_section: + if key_pattern.match(line): + newline = "\n" if line.endswith("\n") else "" + lines[index] = f"{key}={value}{newline}" + inserted_or_updated = True + break + insert_index = index + 1 + + if inserted_or_updated: + continue + + if not section_found: + if lines and lines[-1].strip(): + lines.append("\n") + lines.append(f"[{section}]\n") + lines.append(f"{key}={value}\n") + continue + + lines[insert_index:insert_index] = [f"{key}={value}\n"] + + with open(ini_path, "w", encoding="utf-8") as ini_file: + ini_file.writelines(lines) + def _disable_hq_font_auto(self, ini_file: Path) -> bool: try: if not ini_file.exists(): @@ -494,40 +702,111 @@ class Plugin: return {"status": "success", "message": "\n".join(cleanup_messages)} - async def list_installed_games(self) -> dict: + async def get_game_config(self, appid: str) -> dict: try: - games = [] - for library_path in self._steam_library_paths(): - steamapps_path = library_path / "steamapps" - if not steamapps_path.exists(): - continue + paths = self._ensure_managed_ini(str(appid)) + manifest = self._parse_manifest_env(paths["manifest_path"]) + installed_game = self._find_installed_game(str(appid)) + game_name = installed_game[0]["name"] if installed_game else str(appid) - for appmanifest in steamapps_path.glob("appmanifest_*.acf"): - game_info = {"appid": "", "name": ""} - try: - with open(appmanifest, "r", encoding="utf-8", errors="replace") as file: - for line in file: - if '"appid"' in line: - game_info["appid"] = line.split('"appid"', 1)[1].strip().strip('"') - if '"name"' in line: - game_info["name"] = line.split('"name"', 1)[1].strip().strip('"') - except Exception as exc: - decky.logger.error(f"Skipping {appmanifest}: {exc}") - - if game_info["appid"] and game_info["name"]: - games.append(game_info) - - filtered_games = [ - g - for g in games - if "Proton" not in g["name"] and "Steam Linux Runtime" not in g["name"] - ] + source_ini = paths["live_ini"] if paths["live_ini"].exists() else paths["managed_ini"] + with open(source_ini, "r", encoding="utf-8", errors="replace") as ini_file: + raw_ini = ini_file.read() - deduped = {} - for game in filtered_games: - deduped[str(game["appid"])] = game + preferred_proxy = manifest.get("PREFERRED_PROXY") or manifest.get("MANAGED_PROXY") or "winmm" + proxy = self._normalize_proxy(preferred_proxy) + settings = self._extract_quick_settings(source_ini) - return {"status": "success", "games": list(deduped.values())} + return { + "status": "success", + "appid": str(appid), + "name": game_name, + "proxy": proxy, + "settings": settings, + "raw_ini": raw_ini, + "managed_exists": paths["managed_ini"].exists(), + "live_available": paths["live_ini"].exists(), + "paths": { + "compatdata": str(paths["compatdata_dir"]), + "managed_root": str(paths["managed_root"]), + "managed_ini": str(paths["managed_ini"]), + "system32": str(paths["system32"]), + "live_ini": str(paths["live_ini"]), + }, + } + except Exception as exc: + decky.logger.error(f"get_game_config failed for {appid}: {exc}") + return {"status": "error", "message": str(exc)} + + async def save_game_config( + self, + appid: str, + settings: dict | None = None, + proxy: str | None = None, + apply_live: bool = False, + raw_ini: str | None = None, + ) -> dict: + try: + paths = self._ensure_managed_ini(str(appid)) + manifest_updates = {} + if proxy: + manifest_updates["PREFERRED_PROXY"] = self._normalize_proxy(proxy) + if manifest_updates: + self._write_manifest_env(paths["manifest_path"], manifest_updates) + + managed_ini = paths["managed_ini"] + resolved_updates = [] + settings = settings or {} + + if raw_ini is not None: + with open(managed_ini, "w", encoding="utf-8") as ini_file: + ini_file.write(raw_ini) + elif settings: + config = self._read_ini_config(managed_ini) + for setting_id, value in settings.items(): + definition = self._resolve_setting_definition(setting_id) + if not definition: + continue + section = self._resolve_setting_section(config, definition) or definition.get("section") or "OptiScaler" + resolved_updates.append((section, definition["key"], str(value))) + if resolved_updates: + self._replace_ini_values(managed_ini, resolved_updates) + + self._disable_hq_font_auto(managed_ini) + + live_applied = False + if apply_live and paths["live_ini"].exists(): + if raw_ini is not None: + with open(paths["live_ini"], "w", encoding="utf-8") as ini_file: + ini_file.write(raw_ini) + elif resolved_updates: + self._replace_ini_values(paths["live_ini"], resolved_updates) + self._disable_hq_font_auto(paths["live_ini"]) + live_applied = True + + message = "Saved OptiScaler config" + if apply_live: + message = ( + "Saved OptiScaler config and applied it to the running game's live prefix copy" + if live_applied + else "Saved OptiScaler config, but no live prefix copy was available to update" + ) + + return { + "status": "success", + "message": message, + "live_applied": live_applied, + "proxy": self._parse_manifest_env(paths["manifest_path"]).get("PREFERRED_PROXY", proxy or "winmm"), + } + except Exception as exc: + decky.logger.error(f"save_game_config failed for {appid}: {exc}") + return {"status": "error", "message": str(exc)} + + async def list_installed_games(self) -> dict: + try: + games = self._find_installed_game() + sanitized_games = [{"appid": game["appid"], "name": game["name"]} for game in games] + return {"status": "success", "games": sanitized_games} except Exception as exc: decky.logger.error(str(exc)) return {"status": "error", "message": str(exc)} |
