diff options
| -rw-r--r-- | main.py | 510 | ||||
| -rw-r--r-- | py_modules/lsfg_vk/__init__.py | 14 | ||||
| -rw-r--r-- | py_modules/lsfg_vk/base_service.py | 103 | ||||
| -rw-r--r-- | py_modules/lsfg_vk/configuration.py | 175 | ||||
| -rw-r--r-- | py_modules/lsfg_vk/constants.py | 54 | ||||
| -rw-r--r-- | py_modules/lsfg_vk/dll_detection.py | 120 | ||||
| -rw-r--r-- | py_modules/lsfg_vk/installation.py | 225 | ||||
| -rw-r--r-- | py_modules/lsfg_vk/plugin.py | 159 | ||||
| -rw-r--r-- | py_modules/lsfg_vk/types.py | 71 |
9 files changed, 930 insertions, 501 deletions
@@ -1,505 +1,13 @@ -import os -import zipfile -import shutil -import subprocess -import tempfile -from typing import Dict, Any - -# The decky plugin module is located at decky-loader/plugin -# For easy intellisense checkout the decky-loader code repo -# and add the `decky-loader/plugin/imports` path to `python.analysis.extraPaths` in `.vscode/settings.json` -import decky -import asyncio - - -class InstallationService: - """Service for handling lsfg-vk installation and uninstallation""" - - def __init__(self): - self.user_home = os.path.expanduser("~") - self.local_lib_dir = os.path.join(self.user_home, ".local", "lib") - self.local_share_dir = os.path.join(self.user_home, ".local", "share", "vulkan", "implicit_layer.d") - self.lsfg_script_path = os.path.join(self.user_home, "lsfg") - - # File paths - self.lib_file = os.path.join(self.local_lib_dir, "liblsfg-vk.so") - self.json_file = os.path.join(self.local_share_dir, "VkLayer_LS_frame_generation.json") - - async def install(self) -> Dict[str, Any]: - """Install lsfg-vk by extracting the zip file to ~/.local""" - try: - # Get the path to the lsfg-vk_archlinux.zip file in the bin directory - plugin_dir = os.path.dirname(os.path.realpath(__file__)) - zip_path = os.path.join(plugin_dir, "bin", "lsfg-vk_archlinux.zip") - - # Check if the zip file exists - if not os.path.exists(zip_path): - decky.logger.error(f"lsfg-vk_archlinux.zip not found at {zip_path}") - return {"success": False, "error": "lsfg-vk_archlinux.zip file not found"} - - # Create directories if they don't exist - os.makedirs(self.local_lib_dir, exist_ok=True) - os.makedirs(self.local_share_dir, exist_ok=True) - - # Extract the zip file - with zipfile.ZipFile(zip_path, 'r') as zip_ref: - # Use /tmp for temporary extraction since we may not have write permissions in plugin dir - with tempfile.TemporaryDirectory() as temp_dir: - zip_ref.extractall(temp_dir) - - # Look for the extracted files and copy them to the correct locations - for root, dirs, files in os.walk(temp_dir): - for file in files: - src_file = os.path.join(root, file) - if file.endswith('.so'): - # Copy library files to ~/.local/lib - dst_file = os.path.join(self.local_lib_dir, file) - shutil.copy2(src_file, dst_file) - decky.logger.info(f"Copied {file} to {dst_file}") - elif file.endswith('.json'): - # Copy JSON files to ~/.local/share/vulkan/implicit_layer.d - dst_file = os.path.join(self.local_share_dir, file) - shutil.copy2(src_file, dst_file) - decky.logger.info(f"Copied {file} to {dst_file}") - - # Create the lsfg script - self._create_lsfg_script() - - decky.logger.info("lsfg-vk installed successfully") - return {"success": True, "message": "lsfg-vk installed successfully"} - - except Exception as e: - decky.logger.error(f"Error installing lsfg-vk: {str(e)}") - return {"success": False, "error": str(e)} - - def _create_lsfg_script(self): - """Create the lsfg script in home directory""" - script_content = """#!/bin/bash - -export ENABLE_LSFG=1 -export LSFG_MULTIPLIER=2 -export LSFG_FLOW_SCALE=1.0 -# export LSFG_HDR=1 -# export LSFG_PERF_MODE=1 -# export MESA_VK_WSI_PRESENT_MODE=immediate # - disable vsync - -# Execute the passed command with the environment variables set -exec "$@" """ - - with open(self.lsfg_script_path, 'w') as script_file: - script_file.write(script_content) - - # Make the script executable - os.chmod(self.lsfg_script_path, 0o755) - decky.logger.info(f"Created executable lsfg script at {self.lsfg_script_path}") - - async def check_installation(self) -> Dict[str, Any]: - """Check if lsfg-vk is already installed""" - try: - lib_exists = os.path.exists(self.lib_file) - json_exists = os.path.exists(self.json_file) - script_exists = os.path.exists(self.lsfg_script_path) - - decky.logger.info(f"Installation check: lib={lib_exists}, json={json_exists}, script={script_exists}") - - return { - "installed": lib_exists and json_exists, - "lib_exists": lib_exists, - "json_exists": json_exists, - "script_exists": script_exists, - "lib_path": self.lib_file, - "json_path": self.json_file, - "script_path": self.lsfg_script_path - } - - except Exception as e: - decky.logger.error(f"Error checking lsfg-vk installation: {str(e)}") - return {"installed": False, "error": str(e)} - - async def uninstall(self) -> Dict[str, Any]: - """Uninstall lsfg-vk by removing the installed files""" - try: - removed_files = [] - - # Remove library file if it exists - if os.path.exists(self.lib_file): - os.remove(self.lib_file) - removed_files.append(self.lib_file) - decky.logger.info(f"Removed {self.lib_file}") - - # Remove JSON file if it exists - if os.path.exists(self.json_file): - os.remove(self.json_file) - removed_files.append(self.json_file) - decky.logger.info(f"Removed {self.json_file}") - - # Remove lsfg script if it exists - if os.path.exists(self.lsfg_script_path): - os.remove(self.lsfg_script_path) - removed_files.append(self.lsfg_script_path) - decky.logger.info(f"Removed {self.lsfg_script_path}") - - if not removed_files: - return {"success": True, "message": "No lsfg-vk files found to remove"} - - decky.logger.info("lsfg-vk uninstalled successfully") - return { - "success": True, - "message": f"lsfg-vk uninstalled successfully. Removed {len(removed_files)} files.", - "removed_files": removed_files - } - - except Exception as e: - decky.logger.error(f"Error uninstalling lsfg-vk: {str(e)}") - return {"success": False, "error": str(e)} - - def cleanup_on_uninstall(self) -> None: - """Clean up lsfg-vk files when the plugin is uninstalled""" - try: - decky.logger.info(f"Checking for lsfg-vk files to clean up:") - decky.logger.info(f" Library file: {self.lib_file}") - decky.logger.info(f" JSON file: {self.json_file}") - decky.logger.info(f" lsfg script: {self.lsfg_script_path}") - - removed_files = [] - - # Remove library file if it exists - if os.path.exists(self.lib_file): - decky.logger.info(f"Found library file, attempting to remove: {self.lib_file}") - try: - os.remove(self.lib_file) - removed_files.append(self.lib_file) - decky.logger.info(f"Successfully removed {self.lib_file}") - except Exception as e: - decky.logger.error(f"Failed to remove {self.lib_file}: {str(e)}") - else: - decky.logger.info(f"Library file not found: {self.lib_file}") - - # Remove JSON file if it exists - if os.path.exists(self.json_file): - decky.logger.info(f"Found JSON file, attempting to remove: {self.json_file}") - try: - os.remove(self.json_file) - removed_files.append(self.json_file) - decky.logger.info(f"Successfully removed {self.json_file}") - except Exception as e: - decky.logger.error(f"Failed to remove {self.json_file}: {str(e)}") - else: - decky.logger.info(f"JSON file not found: {self.json_file}") - - # Remove lsfg script if it exists - if os.path.exists(self.lsfg_script_path): - decky.logger.info(f"Found lsfg script, attempting to remove: {self.lsfg_script_path}") - try: - os.remove(self.lsfg_script_path) - removed_files.append(self.lsfg_script_path) - decky.logger.info(f"Successfully removed {self.lsfg_script_path}") - except Exception as e: - decky.logger.error(f"Failed to remove {self.lsfg_script_path}: {str(e)}") - else: - decky.logger.info(f"lsfg script not found: {self.lsfg_script_path}") - - if removed_files: - decky.logger.info(f"Cleaned up {len(removed_files)} lsfg-vk files during plugin uninstall: {removed_files}") - else: - decky.logger.info("No lsfg-vk files found to clean up during plugin uninstall") - - except Exception as e: - decky.logger.error(f"Error cleaning up lsfg-vk files during uninstall: {str(e)}") - import traceback - decky.logger.error(f"Traceback: {traceback.format_exc()}") - - -class DllDetectionService: - """Service for detecting Lossless Scaling DLL""" - - async def check_lossless_scaling_dll(self) -> Dict[str, Any]: - """Check if Lossless Scaling DLL is available at the expected paths""" - try: - # Check environment variable first - dll_path = os.getenv("LSFG_DLL_PATH") - if dll_path and dll_path.strip(): - dll_path_str = dll_path.strip() - if os.path.exists(dll_path_str): - return { - "detected": True, - "path": dll_path_str, - "source": "LSFG_DLL_PATH environment variable" - } - - # Check XDG_DATA_HOME path - data_dir = os.getenv("XDG_DATA_HOME") - if data_dir and data_dir.strip(): - dll_path_str = os.path.join(data_dir.strip(), "Steam", "steamapps", "common", "Lossless Scaling", "Lossless.dll") - if os.path.exists(dll_path_str): - return { - "detected": True, - "path": dll_path_str, - "source": "XDG_DATA_HOME Steam directory" - } - - # Check HOME/.local/share path - home_dir = os.getenv("HOME") - if home_dir and home_dir.strip(): - dll_path_str = os.path.join(home_dir.strip(), ".local", "share", "Steam", "steamapps", "common", "Lossless Scaling", "Lossless.dll") - if os.path.exists(dll_path_str): - return { - "detected": True, - "path": dll_path_str, - "source": "HOME/.local/share Steam directory" - } - - # DLL not found in any expected location - return { - "detected": False, - "path": None, - "source": None, - "message": "Lossless Scaling DLL not found in expected locations" - } - - except Exception as e: - decky.logger.error(f"Error checking Lossless Scaling DLL: {str(e)}") - return { - "detected": False, - "path": None, - "source": None, - "error": str(e) - } - - -class ConfigurationService: - """Service for managing lsfg script configuration""" - - def __init__(self): - self.user_home = os.path.expanduser("~") - self.lsfg_script_path = os.path.join(self.user_home, "lsfg") - - async def get_config(self) -> Dict[str, Any]: - """Read current lsfg script configuration""" - try: - if not os.path.exists(self.lsfg_script_path): - return { - "success": False, - "error": "lsfg script not found" - } - - with open(self.lsfg_script_path, 'r') as f: - content = f.read() - - # Parse the script content to extract current values - config = { - "enable_lsfg": False, - "multiplier": 2, - "flow_scale": 1.0, - "hdr": False, - "perf_mode": False, - "immediate_mode": False - } - - lines = content.split('\n') - for line in lines: - line = line.strip() - - # Handle ENABLE_LSFG - check if it's commented out or not - if line.startswith('export ENABLE_LSFG='): - try: - value = line.split('=')[1].strip() - config["enable_lsfg"] = value == '1' - except: - pass - elif line.startswith('# export ENABLE_LSFG='): - config["enable_lsfg"] = False - - # Handle LSFG_MULTIPLIER - elif line.startswith('export LSFG_MULTIPLIER='): - try: - value = line.split('=')[1].strip() - config["multiplier"] = int(value) - except: - pass - - # Handle LSFG_FLOW_SCALE - elif line.startswith('export LSFG_FLOW_SCALE='): - try: - value = line.split('=')[1].strip() - config["flow_scale"] = float(value) - except: - pass - - # Handle LSFG_HDR - check if it's commented out or not - elif line.startswith('export LSFG_HDR='): - try: - value = line.split('=')[1].strip() - config["hdr"] = value == '1' - except: - pass - elif line.startswith('# export LSFG_HDR='): - config["hdr"] = False - - # Handle LSFG_PERF_MODE - check if it's commented out or not - elif line.startswith('export LSFG_PERF_MODE='): - try: - value = line.split('=')[1].strip() - config["perf_mode"] = value == '1' - except: - pass - elif line.startswith('# export LSFG_PERF_MODE='): - config["perf_mode"] = False - - # Handle MESA_VK_WSI_PRESENT_MODE - check if it's commented out or not - elif line.startswith('export MESA_VK_WSI_PRESENT_MODE='): - try: - value = line.split('=')[1].strip() - # Remove any comments after the value - value = value.split('#')[0].strip() - config["immediate_mode"] = value == 'immediate' - except: - pass - elif line.startswith('# export MESA_VK_WSI_PRESENT_MODE='): - config["immediate_mode"] = False - - decky.logger.info(f"Parsed lsfg config: {config}") - - return { - "success": True, - "config": config - } - - except Exception as e: - decky.logger.error(f"Error reading lsfg config: {str(e)}") - return { - "success": False, - "error": str(e) - } - - async def update_config(self, enable_lsfg: bool, multiplier: int, flow_scale: float, - hdr: bool, perf_mode: bool, immediate_mode: bool) -> Dict[str, Any]: - """Update lsfg script configuration""" - try: - # Create script content based on parameters - script_content = "#!/bin/bash\n\n" - - if enable_lsfg: - script_content += "export ENABLE_LSFG=1\n" - else: - script_content += "# export ENABLE_LSFG=1\n" - - script_content += f"export LSFG_MULTIPLIER={multiplier}\n" - script_content += f"export LSFG_FLOW_SCALE={flow_scale}\n" - - if hdr: - script_content += "export LSFG_HDR=1\n" - else: - script_content += "# export LSFG_HDR=1\n" - - if perf_mode: - script_content += "export LSFG_PERF_MODE=1\n" - else: - script_content += "# export LSFG_PERF_MODE=1\n" - - if immediate_mode: - script_content += "export MESA_VK_WSI_PRESENT_MODE=immediate # - disable vsync\n" - else: - script_content += "# export MESA_VK_WSI_PRESENT_MODE=immediate # - disable vsync\n" - - # Add the exec line to allow the script to execute passed commands - script_content += "\n# Execute the passed command with the environment variables set\n" - script_content += "exec \"$@\"\n" - - # Write the updated script - with open(self.lsfg_script_path, 'w') as f: - f.write(script_content) - - # Make sure it's executable - os.chmod(self.lsfg_script_path, 0o755) - - decky.logger.info(f"Updated lsfg script configuration: enable={enable_lsfg}, multiplier={multiplier}, flow_scale={flow_scale}, hdr={hdr}, perf_mode={perf_mode}, immediate_mode={immediate_mode}") - - return { - "success": True, - "message": "lsfg configuration updated successfully" - } - - except Exception as e: - decky.logger.error(f"Error updating lsfg config: {str(e)}") - return { - "success": False, - "error": str(e) - } - +Main entry point for the lsfg-vk Decky Loader plugin. -class Plugin: - def __init__(self): - # Initialize services - self.installation_service = InstallationService() - self.dll_detection_service = DllDetectionService() - self.configuration_service = ConfigurationService() - - # Installation methods - async def install_lsfg_vk(self) -> dict: - """Install lsfg-vk by extracting the zip file to ~/.local""" - return await self.installation_service.install() - - async def check_lsfg_vk_installed(self) -> dict: - """Check if lsfg-vk is already installed""" - return await self.installation_service.check_installation() - - async def uninstall_lsfg_vk(self) -> dict: - """Uninstall lsfg-vk by removing the installed files""" - return await self.installation_service.uninstall() - - # DLL detection methods - async def check_lossless_scaling_dll(self) -> dict: - """Check if Lossless Scaling DLL is available at the expected paths""" - return await self.dll_detection_service.check_lossless_scaling_dll() - - # Configuration methods - async def get_lsfg_config(self) -> dict: - """Read current lsfg script configuration""" - return await self.configuration_service.get_config() - - async def update_lsfg_config(self, enable_lsfg: bool, multiplier: int, flow_scale: float, - hdr: bool, perf_mode: bool, immediate_mode: bool) -> dict: - """Update lsfg script configuration""" - return await self.configuration_service.update_config( - enable_lsfg, multiplier, flow_scale, hdr, perf_mode, immediate_mode - ) - - # Plugin lifecycle methods - async def _main(self): - """Asyncio-compatible long-running code, executed in a task when the plugin is loaded""" - decky.logger.info("Lossless Scaling loaded!") - - async def _unload(self): - """Function called first during the unload process""" - decky.logger.info("Lossless Scaling unloading") +This file imports and exposes the Plugin class from the lsfg_vk package. +The actual implementation has been refactored into separate service modules +for better maintainability and testability. +""" - async def _uninstall(self): - """Function called after `_unload` during uninstall""" - decky.logger.info("Lossless Scaling uninstalled - starting cleanup") - - # Clean up lsfg-vk files when the plugin is uninstalled - self.installation_service.cleanup_on_uninstall() - - decky.logger.info("Lossless Scaling uninstall cleanup completed") +# Import the refactored Plugin class +from lsfg_vk import Plugin - async def _migration(self): - """Migrations that should be performed before entering `_main()`""" - decky.logger.info("Migrating") - # Here's a migration example for logs: - # - `~/.config/decky-template/template.log` will be migrated to `decky.decky_LOG_DIR/template.log` - decky.migrate_logs(os.path.join(decky.DECKY_USER_HOME, - ".config", "decky-template", "template.log")) - # Here's a migration example for settings: - # - `~/homebrew/settings/template.json` is migrated to `decky.decky_SETTINGS_DIR/template.json` - # - `~/.config/decky-template/` all files and directories under this root are migrated to `decky.decky_SETTINGS_DIR/` - decky.migrate_settings( - os.path.join(decky.DECKY_HOME, "settings", "template.json"), - os.path.join(decky.DECKY_USER_HOME, ".config", "decky-template")) - # Here's a migration example for runtime data: - # - `~/homebrew/template/` all files and directories under this root are migrated to `decky.decky_RUNTIME_DIR/` - # - `~/.local/share/decky-template/` all files and directories under this root are migrated to `decky.decky_RUNTIME_DIR/` - decky.migrate_runtime( - os.path.join(decky.DECKY_HOME, "template"), - os.path.join(decky.DECKY_USER_HOME, ".local", "share", "decky-template")) +# Re-export Plugin at module level for Decky Loader compatibility +__all__ = ['Plugin'] diff --git a/py_modules/lsfg_vk/__init__.py b/py_modules/lsfg_vk/__init__.py new file mode 100644 index 0000000..8343853 --- /dev/null +++ b/py_modules/lsfg_vk/__init__.py @@ -0,0 +1,14 @@ +""" +lsfg-vk plugin package for Decky Loader. + +This package provides services for installing and managing the lsfg-vk +Vulkan layer for Lossless Scaling frame generation. +""" + +# Import will be available once plugin.py exists +try: + from .plugin import Plugin + __all__ = ['Plugin'] +except ImportError: + # During development, plugin may not exist yet + __all__ = [] diff --git a/py_modules/lsfg_vk/base_service.py b/py_modules/lsfg_vk/base_service.py new file mode 100644 index 0000000..b547759 --- /dev/null +++ b/py_modules/lsfg_vk/base_service.py @@ -0,0 +1,103 @@ +""" +Base service class with common functionality. +""" + +import os +import shutil +import tempfile +from pathlib import Path +from typing import Any, Optional + +from .constants import LOCAL_LIB, LOCAL_SHARE_BASE, VULKAN_LAYER_DIR, SCRIPT_NAME + + +class BaseService: + """Base service class with common functionality""" + + def __init__(self, logger: Optional[Any] = None): + """Initialize base service + + Args: + logger: Logger instance, defaults to decky.logger if None + """ + if logger is None: + import decky + self.log = decky.logger + else: + self.log = logger + + # Initialize common paths using pathlib + self.user_home = Path.home() + self.local_lib_dir = self.user_home / LOCAL_LIB + self.local_share_dir = self.user_home / VULKAN_LAYER_DIR + self.lsfg_script_path = self.user_home / SCRIPT_NAME + + def _ensure_directories(self) -> None: + """Create necessary directories if they don't exist""" + self.local_lib_dir.mkdir(parents=True, exist_ok=True) + self.local_share_dir.mkdir(parents=True, exist_ok=True) + self.log.info(f"Ensured directories exist: {self.local_lib_dir}, {self.local_share_dir}") + + def _remove_if_exists(self, path: Path) -> bool: + """Remove a file if it exists + + Args: + path: Path to the file to remove + + Returns: + True if file was removed, False if it didn't exist + + Raises: + OSError: If removal fails + """ + if path.exists(): + try: + path.unlink() + self.log.info(f"Removed {path}") + return True + except OSError as e: + self.log.error(f"Failed to remove {path}: {e}") + raise + else: + self.log.info(f"File not found: {path}") + return False + + def _atomic_write(self, path: Path, content: str, mode: int = 0o644) -> None: + """Write content to a file atomically + + Args: + path: Target file path + content: Content to write + mode: File permissions (default: 0o644) + + Raises: + OSError: If write fails + """ + # Create temporary file in the same directory to ensure atomic move + temp_path = None + try: + with tempfile.NamedTemporaryFile( + mode='w', + dir=path.parent, + delete=False, + prefix=f'.{path.name}.', + suffix='.tmp' + ) as temp_file: + temp_file.write(content) + temp_path = Path(temp_file.name) + + # Set permissions before moving + temp_path.chmod(mode) + + # Atomic move + temp_path.replace(path) + self.log.info(f"Atomically wrote to {path}") + + except Exception: + # Clean up temp file if something went wrong + if temp_path and temp_path.exists(): + try: + temp_path.unlink() + except OSError: + pass # Best effort cleanup + raise diff --git a/py_modules/lsfg_vk/configuration.py b/py_modules/lsfg_vk/configuration.py new file mode 100644 index 0000000..f5e2981 --- /dev/null +++ b/py_modules/lsfg_vk/configuration.py @@ -0,0 +1,175 @@ +""" +Configuration service for lsfg script management. +""" + +import re +from pathlib import Path +from typing import Dict, Any + +from .base_service import BaseService +from .constants import LSFG_SCRIPT_TEMPLATE +from .types import ConfigurationResponse, ConfigurationData + + +class ConfigurationService(BaseService): + """Service for managing lsfg script configuration""" + + def get_config(self) -> ConfigurationResponse: + """Read current lsfg script configuration + + Returns: + ConfigurationResponse with current configuration or error + """ + try: + if not self.lsfg_script_path.exists(): + return { + "success": False, + "config": None, + "message": None, + "error": "lsfg script not found" + } + + content = self.lsfg_script_path.read_text() + config = self._parse_script_content(content) + + self.log.info(f"Parsed lsfg config: {config}") + + return { + "success": True, + "config": config, + "message": None, + "error": None + } + + except (OSError, IOError) as e: + error_msg = f"Error reading lsfg config: {str(e)}" + self.log.error(error_msg) + return { + "success": False, + "config": None, + "message": None, + "error": str(e) + } + + def _parse_script_content(self, content: str) -> ConfigurationData: + """Parse script content to extract configuration values + + Args: + content: Script file content + + Returns: + ConfigurationData with parsed values + """ + config: ConfigurationData = { + "enable_lsfg": False, + "multiplier": 2, + "flow_scale": 1.0, + "hdr": False, + "perf_mode": False, + "immediate_mode": False + } + + lines = content.split('\n') + for line in lines: + line = line.strip() + + # Parse ENABLE_LSFG + if match := re.match(r'^(#\s*)?export\s+ENABLE_LSFG=(\d+)', line): + config["enable_lsfg"] = not bool(match.group(1)) and match.group(2) == '1' + + # Parse LSFG_MULTIPLIER + elif match := re.match(r'^export\s+LSFG_MULTIPLIER=(\d+)', line): + try: + config["multiplier"] = int(match.group(1)) + except ValueError: + pass + + # Parse LSFG_FLOW_SCALE + elif match := re.match(r'^export\s+LSFG_FLOW_SCALE=([0-9]*\.?[0-9]+)', line): + try: + config["flow_scale"] = float(match.group(1)) + except ValueError: + pass + + # Parse LSFG_HDR + elif match := re.match(r'^(#\s*)?export\s+LSFG_HDR=(\d+)', line): + config["hdr"] = not bool(match.group(1)) and match.group(2) == '1' + + # Parse LSFG_PERF_MODE + elif match := re.match(r'^(#\s*)?export\s+LSFG_PERF_MODE=(\d+)', line): + config["perf_mode"] = not bool(match.group(1)) and match.group(2) == '1' + + # Parse MESA_VK_WSI_PRESENT_MODE + elif match := re.match(r'^(#\s*)?export\s+MESA_VK_WSI_PRESENT_MODE=([^\s#]+)', line): + config["immediate_mode"] = not bool(match.group(1)) and match.group(2) == 'immediate' + + return config + + def update_config(self, enable_lsfg: bool, multiplier: int, flow_scale: float, + hdr: bool, perf_mode: bool, immediate_mode: bool) -> ConfigurationResponse: + """Update lsfg script configuration + + Args: + enable_lsfg: Whether to enable LSFG + multiplier: LSFG multiplier value + flow_scale: LSFG flow scale value + hdr: Whether to enable HDR + perf_mode: Whether to enable performance mode + immediate_mode: Whether to enable immediate present mode (disable vsync) + + Returns: + ConfigurationResponse with success status + """ + try: + # Generate script content using template + script_content = self._generate_script_content( + enable_lsfg, multiplier, flow_scale, hdr, perf_mode, immediate_mode + ) + + # Write the updated script atomically + self._atomic_write(self.lsfg_script_path, script_content, 0o755) + + self.log.info(f"Updated lsfg script configuration: enable={enable_lsfg}, " + f"multiplier={multiplier}, flow_scale={flow_scale}, hdr={hdr}, " + f"perf_mode={perf_mode}, immediate_mode={immediate_mode}") + + return { + "success": True, + "config": None, + "message": "lsfg configuration updated successfully", + "error": None + } + + except (OSError, IOError) as e: + error_msg = f"Error updating lsfg config: {str(e)}" + self.log.error(error_msg) + return { + "success": False, + "config": None, + "message": None, + "error": str(e) + } + + def _generate_script_content(self, enable_lsfg: bool, multiplier: int, flow_scale: float, + hdr: bool, perf_mode: bool, immediate_mode: bool) -> str: + """Generate script content from configuration parameters + + Args: + enable_lsfg: Whether to enable LSFG + multiplier: LSFG multiplier value + flow_scale: LSFG flow scale value + hdr: Whether to enable HDR + perf_mode: Whether to enable performance mode + immediate_mode: Whether to enable immediate present mode + + Returns: + Generated script content + """ + return LSFG_SCRIPT_TEMPLATE.format( + enable_lsfg="export ENABLE_LSFG=1" if enable_lsfg else "# export ENABLE_LSFG=1", + multiplier=multiplier, + flow_scale=flow_scale, + hdr="export LSFG_HDR=1" if hdr else "# export LSFG_HDR=1", + perf_mode="export LSFG_PERF_MODE=1" if perf_mode else "# export LSFG_PERF_MODE=1", + immediate_mode="export MESA_VK_WSI_PRESENT_MODE=immediate # - disable vsync" if immediate_mode else "# export MESA_VK_WSI_PRESENT_MODE=immediate # - disable vsync" + ) diff --git a/py_modules/lsfg_vk/constants.py b/py_modules/lsfg_vk/constants.py new file mode 100644 index 0000000..28246c2 --- /dev/null +++ b/py_modules/lsfg_vk/constants.py @@ -0,0 +1,54 @@ +""" +Constants for the lsfg-vk plugin. +""" + +from pathlib import Path + +# Directory paths +LOCAL_LIB = ".local/lib" +LOCAL_SHARE_BASE = ".local/share" +VULKAN_LAYER_DIR = ".local/share/vulkan/implicit_layer.d" + +# File names +SCRIPT_NAME = "lsfg" +LIB_FILENAME = "liblsfg-vk.so" +JSON_FILENAME = "VkLayer_LS_frame_generation.json" +ZIP_FILENAME = "lsfg-vk_archlinux.zip" + +# File extensions +SO_EXT = ".so" +JSON_EXT = ".json" + +# Directory for the zip file +BIN_DIR = "bin" + +# Lossless Scaling paths +STEAM_COMMON_PATH = Path("steamapps/common/Lossless Scaling") +LOSSLESS_DLL_NAME = "Lossless.dll" + +# Script template +LSFG_SCRIPT_TEMPLATE = """#!/bin/bash + +{enable_lsfg} +export LSFG_MULTIPLIER={multiplier} +export LSFG_FLOW_SCALE={flow_scale} +{hdr} +{perf_mode} +{immediate_mode} + +# Execute the passed command with the environment variables set +exec "$@" +""" + +# Environment variable names +ENV_LSFG_DLL_PATH = "LSFG_DLL_PATH" +ENV_XDG_DATA_HOME = "XDG_DATA_HOME" +ENV_HOME = "HOME" + +# Default configuration values +DEFAULT_MULTIPLIER = 2 +DEFAULT_FLOW_SCALE = 1.0 +DEFAULT_ENABLE_LSFG = True +DEFAULT_HDR = False +DEFAULT_PERF_MODE = False +DEFAULT_IMMEDIATE_MODE = False diff --git a/py_modules/lsfg_vk/dll_detection.py b/py_modules/lsfg_vk/dll_detection.py new file mode 100644 index 0000000..f1dace9 --- /dev/null +++ b/py_modules/lsfg_vk/dll_detection.py @@ -0,0 +1,120 @@ +""" +DLL detection service for Lossless Scaling. +""" + +import os +from pathlib import Path +from typing import Dict, Any + +from .base_service import BaseService +from .constants import ( + ENV_LSFG_DLL_PATH, ENV_XDG_DATA_HOME, ENV_HOME, + STEAM_COMMON_PATH, LOSSLESS_DLL_NAME +) +from .types import DllDetectionResponse + + +class DllDetectionService(BaseService): + """Service for detecting Lossless Scaling DLL""" + + def check_lossless_scaling_dll(self) -> DllDetectionResponse: + """Check if Lossless Scaling DLL is available at the expected paths + + Returns: + DllDetectionResponse with detection status and path information + """ + try: + # Check environment variable first + dll_path = self._check_env_dll_path() + if dll_path: + return dll_path + + # Check XDG_DATA_HOME path + xdg_path = self._check_xdg_data_home() + if xdg_path: + return xdg_path + + # Check HOME/.local/share path + home_path = self._check_home_local_share() + if home_path: + return home_path + + # DLL not found in any expected location + return { + "detected": False, + "path": None, + "source": None, + "message": "Lossless Scaling DLL not found in expected locations", + "error": None + } + + except Exception as e: + error_msg = f"Error checking Lossless Scaling DLL: {str(e)}" + self.log.error(error_msg) + return { + "detected": False, + "path": None, + "source": None, + "message": None, + "error": str(e) + } + + def _check_env_dll_path(self) -> DllDetectionResponse | None: + """Check LSFG_DLL_PATH environment variable + + Returns: + DllDetectionResponse if found, None otherwise + """ + dll_path = os.getenv(ENV_LSFG_DLL_PATH) + if dll_path and dll_path.strip(): + dll_path_obj = Path(dll_path.strip()) + if dll_path_obj.exists(): + self.log.info(f"Found DLL via {ENV_LSFG_DLL_PATH}: {dll_path_obj}") + return { + "detected": True, + "path": str(dll_path_obj), + "source": f"{ENV_LSFG_DLL_PATH} environment variable", + "message": None, + "error": None + } + return None + + def _check_xdg_data_home(self) -> DllDetectionResponse | None: + """Check XDG_DATA_HOME Steam directory + + Returns: + DllDetectionResponse if found, None otherwise + """ + data_dir = os.getenv(ENV_XDG_DATA_HOME) + if data_dir and data_dir.strip(): + dll_path = Path(data_dir.strip()) / "Steam" / STEAM_COMMON_PATH / LOSSLESS_DLL_NAME + if dll_path.exists(): + self.log.info(f"Found DLL via {ENV_XDG_DATA_HOME}: {dll_path}") + return { + "detected": True, + "path": str(dll_path), + "source": f"{ENV_XDG_DATA_HOME} Steam directory", + "message": None, + "error": None + } + return None + + def _check_home_local_share(self) -> DllDetectionResponse | None: + """Check HOME/.local/share Steam directory + + Returns: + DllDetectionResponse if found, None otherwise + """ + home_dir = os.getenv(ENV_HOME) + if home_dir and home_dir.strip(): + dll_path = Path(home_dir.strip()) / ".local" / "share" / "Steam" / STEAM_COMMON_PATH / LOSSLESS_DLL_NAME + if dll_path.exists(): + self.log.info(f"Found DLL via {ENV_HOME}/.local/share: {dll_path}") + return { + "detected": True, + "path": str(dll_path), + "source": f"{ENV_HOME}/.local/share Steam directory", + "message": None, + "error": None + } + return None diff --git a/py_modules/lsfg_vk/installation.py b/py_modules/lsfg_vk/installation.py new file mode 100644 index 0000000..1d0e96f --- /dev/null +++ b/py_modules/lsfg_vk/installation.py @@ -0,0 +1,225 @@ +""" +Installation service for lsfg-vk. +""" + +import os +import shutil +import zipfile +import tempfile +from pathlib import Path +from typing import Dict, Any + +from .base_service import BaseService +from .constants import ( + LIB_FILENAME, JSON_FILENAME, ZIP_FILENAME, BIN_DIR, + SO_EXT, JSON_EXT, LSFG_SCRIPT_TEMPLATE, + DEFAULT_MULTIPLIER, DEFAULT_FLOW_SCALE, DEFAULT_ENABLE_LSFG, + DEFAULT_HDR, DEFAULT_PERF_MODE, DEFAULT_IMMEDIATE_MODE +) +from .types import InstallationResponse, UninstallationResponse, InstallationCheckResponse + + +class InstallationService(BaseService): + """Service for handling lsfg-vk installation and uninstallation""" + + def __init__(self, logger=None): + super().__init__(logger) + + # File paths using constants + self.lib_file = self.local_lib_dir / LIB_FILENAME + self.json_file = self.local_share_dir / JSON_FILENAME + + def install(self) -> InstallationResponse: + """Install lsfg-vk by extracting the zip file to ~/.local + + Returns: + InstallationResponse with success status and message/error + """ + try: + # Get the path to the zip file - need to go up to plugin root from py_modules/lsfg_vk/ + plugin_dir = Path(__file__).parent.parent.parent + zip_path = plugin_dir / BIN_DIR / ZIP_FILENAME + + # Check if the zip file exists + if not zip_path.exists(): + error_msg = f"{ZIP_FILENAME} not found at {zip_path}" + self.log.error(error_msg) + return {"success": False, "error": error_msg, "message": ""} + + # Create directories if they don't exist + self._ensure_directories() + + # Extract and install files + self._extract_and_install_files(zip_path) + + # Create the lsfg script + self._create_lsfg_script() + + self.log.info("lsfg-vk installed successfully") + return {"success": True, "message": "lsfg-vk installed successfully", "error": None} + + except (OSError, zipfile.BadZipFile, shutil.Error) as e: + error_msg = f"Error installing lsfg-vk: {str(e)}" + self.log.error(error_msg) + return {"success": False, "error": str(e), "message": ""} + except Exception as e: + # Catch unexpected errors but log them separately + error_msg = f"Unexpected error installing lsfg-vk: {str(e)}" + self.log.error(error_msg) + return {"success": False, "error": str(e), "message": ""} + + def _extract_and_install_files(self, zip_path: Path) -> None: + """Extract zip file and install files to appropriate locations + + Args: + zip_path: Path to the zip file to extract + + Raises: + zipfile.BadZipFile: If zip file is corrupted + OSError: If file operations fail + """ + # Destination mapping for file types + dest_map = { + SO_EXT: self.local_lib_dir, + JSON_EXT: self.local_share_dir + } + + with zipfile.ZipFile(zip_path, 'r') as zip_ref: + with tempfile.TemporaryDirectory() as temp_dir: + temp_path = Path(temp_dir) + zip_ref.extractall(temp_path) + + # Process extracted files + for root, dirs, files in os.walk(temp_path): + root_path = Path(root) + for file in files: + src_file = root_path / file + file_path = Path(file) + + # Check if we know where this file type should go + dst_dir = dest_map.get(file_path.suffix) + if dst_dir: + dst_file = dst_dir / file + shutil.copy2(src_file, dst_file) + self.log.info(f"Copied {file} to {dst_file}") + + def _create_lsfg_script(self) -> None: + """Create the lsfg script in home directory with default configuration""" + script_content = LSFG_SCRIPT_TEMPLATE.format( + enable_lsfg="export ENABLE_LSFG=1" if DEFAULT_ENABLE_LSFG else "# export ENABLE_LSFG=1", + multiplier=DEFAULT_MULTIPLIER, + flow_scale=DEFAULT_FLOW_SCALE, + hdr="export LSFG_HDR=1" if DEFAULT_HDR else "# export LSFG_HDR=1", + perf_mode="export LSFG_PERF_MODE=1" if DEFAULT_PERF_MODE else "# export LSFG_PERF_MODE=1", + immediate_mode="export MESA_VK_WSI_PRESENT_MODE=immediate # - disable vsync" if DEFAULT_IMMEDIATE_MODE else "# export MESA_VK_WSI_PRESENT_MODE=immediate # - disable vsync" + ) + + # Use atomic write to prevent corruption + self._atomic_write(self.lsfg_script_path, script_content, 0o755) + self.log.info(f"Created executable lsfg script at {self.lsfg_script_path}") + + def check_installation(self) -> InstallationCheckResponse: + """Check if lsfg-vk is already installed + + Returns: + InstallationCheckResponse with installation status and file paths + """ + try: + lib_exists = self.lib_file.exists() + json_exists = self.json_file.exists() + script_exists = self.lsfg_script_path.exists() + + self.log.info(f"Installation check: lib={lib_exists}, json={json_exists}, script={script_exists}") + + return { + "installed": lib_exists and json_exists, + "lib_exists": lib_exists, + "json_exists": json_exists, + "script_exists": script_exists, + "lib_path": str(self.lib_file), + "json_path": str(self.json_file), + "script_path": str(self.lsfg_script_path), + "error": None + } + + except Exception as e: + error_msg = f"Error checking lsfg-vk installation: {str(e)}" + self.log.error(error_msg) + return { + "installed": False, + "lib_exists": False, + "json_exists": False, + "script_exists": False, + "lib_path": str(self.lib_file), + "json_path": str(self.json_file), + "script_path": str(self.lsfg_script_path), + "error": str(e) + } + + def uninstall(self) -> UninstallationResponse: + """Uninstall lsfg-vk by removing the installed files + + Returns: + UninstallationResponse with success status and removed files list + """ + try: + removed_files = [] + files_to_remove = [self.lib_file, self.json_file, self.lsfg_script_path] + + for file_path in files_to_remove: + if self._remove_if_exists(file_path): + removed_files.append(str(file_path)) + + if not removed_files: + return { + "success": True, + "message": "No lsfg-vk files found to remove", + "removed_files": None, + "error": None + } + + self.log.info("lsfg-vk uninstalled successfully") + return { + "success": True, + "message": f"lsfg-vk uninstalled successfully. Removed {len(removed_files)} files.", + "removed_files": removed_files, + "error": None + } + + except OSError as e: + error_msg = f"Error uninstalling lsfg-vk: {str(e)}" + self.log.error(error_msg) + return { + "success": False, + "message": "", + "removed_files": None, + "error": str(e) + } + + def cleanup_on_uninstall(self) -> None: + """Clean up lsfg-vk files when the plugin is uninstalled""" + try: + self.log.info("Checking for lsfg-vk files to clean up:") + self.log.info(f" Library file: {self.lib_file}") + self.log.info(f" JSON file: {self.json_file}") + self.log.info(f" lsfg script: {self.lsfg_script_path}") + + removed_files = [] + files_to_remove = [self.lib_file, self.json_file, self.lsfg_script_path] + + for file_path in files_to_remove: + try: + if self._remove_if_exists(file_path): + removed_files.append(str(file_path)) + except OSError as e: + self.log.error(f"Failed to remove {file_path}: {e}") + + if removed_files: + self.log.info(f"Cleaned up {len(removed_files)} lsfg-vk files during plugin uninstall: {removed_files}") + else: + self.log.info("No lsfg-vk files found to clean up during plugin uninstall") + + except Exception as e: + self.log.error(f"Error cleaning up lsfg-vk files during uninstall: {str(e)}") + import traceback + self.log.error(f"Traceback: {traceback.format_exc()}") diff --git a/py_modules/lsfg_vk/plugin.py b/py_modules/lsfg_vk/plugin.py new file mode 100644 index 0000000..000830f --- /dev/null +++ b/py_modules/lsfg_vk/plugin.py @@ -0,0 +1,159 @@ +""" +Main plugin class for the lsfg-vk Decky Loader plugin. + +This plugin provides services for installing and managing the lsfg-vk +Vulkan layer for Lossless Scaling frame generation on Steam Deck. +""" + +import os +from typing import Dict, Any + +from .installation import InstallationService +from .dll_detection import DllDetectionService +from .configuration import ConfigurationService + + +class Plugin: + """ + Main plugin class for lsfg-vk management. + + This class provides a unified interface for installation, configuration, + and DLL detection services. It implements the Decky Loader plugin lifecycle + methods (_main, _unload, _uninstall, _migration). + """ + + def __init__(self): + """Initialize the plugin with all necessary services""" + # Initialize services - they will use decky.logger by default + self.installation_service = InstallationService() + self.dll_detection_service = DllDetectionService() + self.configuration_service = ConfigurationService() + + # Installation methods + async def install_lsfg_vk(self) -> Dict[str, Any]: + """Install lsfg-vk by extracting the zip file to ~/.local + + Returns: + InstallationResponse dict with success status and message/error + """ + return self.installation_service.install() + + async def check_lsfg_vk_installed(self) -> Dict[str, Any]: + """Check if lsfg-vk is already installed + + Returns: + InstallationCheckResponse dict with installation status and paths + """ + return self.installation_service.check_installation() + + async def uninstall_lsfg_vk(self) -> Dict[str, Any]: + """Uninstall lsfg-vk by removing the installed files + + Returns: + UninstallationResponse dict with success status and removed files + """ + return self.installation_service.uninstall() + + # DLL detection methods + async def check_lossless_scaling_dll(self) -> Dict[str, Any]: + """Check if Lossless Scaling DLL is available at the expected paths + + Returns: + DllDetectionResponse dict with detection status and path info + """ + return self.dll_detection_service.check_lossless_scaling_dll() + + # Configuration methods + async def get_lsfg_config(self) -> Dict[str, Any]: + """Read current lsfg script configuration + + Returns: + ConfigurationResponse dict with current configuration or error + """ + return self.configuration_service.get_config() + + async def update_lsfg_config(self, enable_lsfg: bool, multiplier: int, flow_scale: float, + hdr: bool, perf_mode: bool, immediate_mode: bool) -> Dict[str, Any]: + """Update lsfg script configuration + + Args: + enable_lsfg: Whether to enable LSFG + multiplier: LSFG multiplier value (typically 2-4) + flow_scale: LSFG flow scale value (typically 0.5-2.0) + hdr: Whether to enable HDR + perf_mode: Whether to enable performance mode + immediate_mode: Whether to enable immediate present mode (disable vsync) + + Returns: + ConfigurationResponse dict with success status + """ + return self.configuration_service.update_config( + enable_lsfg, multiplier, flow_scale, hdr, perf_mode, immediate_mode + ) + + # Plugin lifecycle methods + async def _main(self): + """ + Asyncio-compatible long-running code, executed in a task when the plugin is loaded. + + This method is called by Decky Loader when the plugin starts up. + Currently just logs that the plugin has loaded successfully. + """ + import decky + decky.logger.info("Lossless Scaling VK plugin loaded!") + + async def _unload(self): + """ + Function called first during the unload process. + + This method is called by Decky Loader when the plugin is being unloaded. + Use this for cleanup that should happen when the plugin stops. + """ + import decky + decky.logger.info("Lossless Scaling VK plugin unloading") + + async def _uninstall(self): + """ + Function called after `_unload` during uninstall. + + This method is called by Decky Loader when the plugin is being uninstalled. + It automatically cleans up any lsfg-vk files that were installed. + """ + import decky + decky.logger.info("Lossless Scaling VK plugin uninstalled - starting cleanup") + + # Clean up lsfg-vk files when the plugin is uninstalled + self.installation_service.cleanup_on_uninstall() + + decky.logger.info("Lossless Scaling VK plugin uninstall cleanup completed") + + async def _migration(self): + """ + Migrations that should be performed before entering `_main()`. + + This method is called by Decky Loader for plugin migrations. + Currently migrates logs, settings, and runtime data from old locations. + """ + import decky + decky.logger.info("Running Lossless Scaling VK plugin migrations") + + # Migrate logs from old location + # ~/.config/decky-lossless-scaling-vk/lossless-scaling-vk.log -> decky.DECKY_LOG_DIR/lossless-scaling-vk.log + decky.migrate_logs(os.path.join(decky.DECKY_USER_HOME, + ".config", "decky-lossless-scaling-vk", "lossless-scaling-vk.log")) + + # Migrate settings from old locations + # ~/homebrew/settings/lossless-scaling-vk.json -> decky.DECKY_SETTINGS_DIR/lossless-scaling-vk.json + # ~/.config/decky-lossless-scaling-vk/ -> decky.DECKY_SETTINGS_DIR/ + decky.migrate_settings( + os.path.join(decky.DECKY_HOME, "settings", "lossless-scaling-vk.json"), + os.path.join(decky.DECKY_USER_HOME, ".config", "decky-lossless-scaling-vk")) + + # Migrate runtime data from old locations + # ~/homebrew/lossless-scaling-vk/ -> decky.DECKY_RUNTIME_DIR/ + # ~/.local/share/decky-lossless-scaling-vk/ -> decky.DECKY_RUNTIME_DIR/ + decky.migrate_runtime( + os.path.join(decky.DECKY_HOME, "lossless-scaling-vk"), + os.path.join(decky.DECKY_USER_HOME, ".local", "share", "decky-lossless-scaling-vk")) + + decky.logger.info("Lossless Scaling VK plugin migrations completed") diff --git a/py_modules/lsfg_vk/types.py b/py_modules/lsfg_vk/types.py new file mode 100644 index 0000000..f0ec892 --- /dev/null +++ b/py_modules/lsfg_vk/types.py @@ -0,0 +1,71 @@ +""" +Type definitions for the lsfg-vk plugin responses. +""" + +from typing import TypedDict, Optional, List + + +class BaseResponse(TypedDict): + """Base response structure""" + success: bool + + +class ErrorResponse(BaseResponse): + """Response structure for errors""" + error: str + + +class MessageResponse(BaseResponse): + """Response structure with message""" + message: str + + +class InstallationResponse(BaseResponse): + """Response for installation operations""" + message: str + error: Optional[str] + + +class UninstallationResponse(BaseResponse): + """Response for uninstallation operations""" + message: str + removed_files: Optional[List[str]] + error: Optional[str] + + +class InstallationCheckResponse(TypedDict): + """Response for installation check""" + installed: bool + lib_exists: bool + json_exists: bool + script_exists: bool + lib_path: str + json_path: str + script_path: str + error: Optional[str] + + +class DllDetectionResponse(TypedDict): + """Response for DLL detection""" + detected: bool + path: Optional[str] + source: Optional[str] + message: Optional[str] + error: Optional[str] + + +class ConfigurationData(TypedDict): + """Configuration data structure""" + enable_lsfg: bool + multiplier: int + flow_scale: float + hdr: bool + perf_mode: bool + immediate_mode: bool + + +class ConfigurationResponse(BaseResponse): + """Response for configuration operations""" + config: Optional[ConfigurationData] + message: Optional[str] + error: Optional[str] |
