From 16970b086607e51f091c8e558abd81bba60944d2 Mon Sep 17 00:00:00 2001 From: Zane V Date: Sat, 15 Nov 2025 19:36:21 -0500 Subject: [PATCH] Update to version v0.1.2.a.6 and enhance plugin security in terminal.py - Updated index.html and version.txt to reflect the new release version v0.1.2.a.6. - Introduced security measures for plugin loading in terminal.py, including AST validation and quarantine for unsafe plugins. - Added a safe mode option to prevent plugin loading, enhancing overall security. --- index.html | 2 +- terminal.py | 251 +++++++++++++++++++++++++++++++++++++++++++++++----- version.txt | 2 +- 3 files changed, 231 insertions(+), 24 deletions(-) diff --git a/index.html b/index.html index 08b894d..ebf0799 100644 --- a/index.html +++ b/index.html @@ -46,7 +46,7 @@

Current release

-

v0.1.2.a.5

+

v0.1.2.a.6

Supported families

diff --git a/terminal.py b/terminal.py index 61b9c70..7c7358a 100644 --- a/terminal.py +++ b/terminal.py @@ -17,6 +17,7 @@ import threading import json import shlex import signal +import ast from datetime import datetime import urllib.request import urllib.error @@ -67,6 +68,12 @@ STATUS_BAR_COLORS = { 'black': ('40', '97'), } +# Protected command names that plugins cannot override +PROTECTED_COMMANDS = { + 'ssh', 'sudo', 'su', 'cp', 'mv', 'rm', 'ls', 'cat', 'chmod', 'chown', + 'history', 'zps', 'zdtt', 'pip', 'python', 'python3', 'curl', 'wget' +} + def _parse_os_release(): """Return a dict of fields from /etc/os-release if available""" @@ -251,6 +258,7 @@ class ZDTTTerminal: self.zdtt_dir = os.path.expanduser("~/.zdtt") self.history_file = os.path.expanduser("~/.zdtt_history") self.plugin_dir = os.path.join(self.zdtt_dir, "plugins") + self.quarantine_dir = os.path.join(self.zdtt_dir, "quarantine") self.log_file = os.path.join(self.zdtt_dir, "plugin_errors.log") self.banner_file = os.path.join(self.zdtt_dir, "banner.txt") self.aliases_file = os.path.join(self.zdtt_dir, "aliases") @@ -262,6 +270,8 @@ class ZDTTTerminal: self.plugin_command_names = set() self.update_check_thread = None self.resize_lock = threading.Lock() # Lock for resize operations + self.safe_mode = False # Safe mode flag (no plugins loaded) + self.quarantine_warnings = [] # Store warnings for plugins quarantined at startup # Setup logging for plugins self.setup_logging() @@ -362,8 +372,9 @@ class ZDTTTerminal: # Setup readline history and tab completion self.setup_readline() - # Load plugins - self.load_plugins() + # Load plugins (unless in safe mode) + if not self.safe_mode: + self.load_plugins() # Kick off async update check self.start_update_check() @@ -781,49 +792,221 @@ ZDTT Terminal v{self.version} return options[state] return None + def _validate_plugin_ast(self, plugin_code, plugin_name): + """ + Validate plugin AST to ensure no top-level code execution. + Only allows: imports, function definitions, class definitions, and docstrings. + """ + try: + tree = ast.parse(plugin_code) + except SyntaxError as e: + raise ValueError(f"Plugin has syntax errors: {e}") + + # Check module body directly - this is the most reliable way + if not isinstance(tree, ast.Module): + raise ValueError("Plugin must be a valid Python module") + + for stmt in tree.body: + # Allow imports + if isinstance(stmt, (ast.Import, ast.ImportFrom)): + continue + # Allow function definitions + if isinstance(stmt, (ast.FunctionDef, ast.AsyncFunctionDef)): + continue + # Allow class definitions + if isinstance(stmt, ast.ClassDef): + continue + # Allow docstrings (Expr with string constant) + if isinstance(stmt, ast.Expr): + # Check if it's a string literal (docstring) + if isinstance(stmt.value, (ast.Constant, ast.Str)): + # For Python 3.8+, use ast.Constant; for older versions, use ast.Str + if isinstance(stmt.value, ast.Constant): + if isinstance(stmt.value.value, str): + continue + elif isinstance(stmt.value, ast.Str): + continue + # Anything else is forbidden (assignments, function calls, loops, etc.) + raise ValueError( + f"Plugin contains forbidden top-level statement: {stmt.__class__.__name__}. " + "Plugins can only contain imports, functions, classes, and docstrings. " + "No top-level code execution is allowed." + ) + + return True + + def _move_to_quarantine(self, plugin_file, reason): + """Move a plugin file to quarantine directory and log the reason.""" + plugin_name = os.path.basename(plugin_file) + os.makedirs(self.quarantine_dir, exist_ok=True) + + # Create unique filename if already exists + quarantine_path = os.path.join(self.quarantine_dir, plugin_name) + counter = 1 + while os.path.exists(quarantine_path): + name, ext = os.path.splitext(plugin_name) + quarantine_path = os.path.join(self.quarantine_dir, f"{name}_{counter}{ext}") + counter += 1 + + try: + shutil.move(plugin_file, quarantine_path) + logging.warning(f"Plugin '{plugin_name}' quarantined: {reason}") + logging.warning(f"Moved to: {quarantine_path}") + return quarantine_path + except Exception as e: + logging.error(f"Failed to quarantine plugin '{plugin_name}': {e}") + return None + + def _validate_plugin_commands(self, plugin_commands, plugin_name): + """Validate that plugin commands don't override protected commands.""" + violations = [] + for cmd_name in plugin_commands.keys(): + if cmd_name in PROTECTED_COMMANDS: + violations.append(cmd_name) + + if violations: + raise ValueError( + f"Plugin attempted to override protected commands: {', '.join(violations)}. " + "This is a security violation and the plugin has been quarantined." + ) + + # Validate that all values are callable + for cmd_name, cmd_func in plugin_commands.items(): + if not callable(cmd_func): + raise ValueError( + f"Plugin command '{cmd_name}' is not callable. " + "All commands must be functions." + ) + + return True + def load_plugins(self): - """Load plugin commands from the plugins directory""" + """Load plugin commands from the plugins directory with security validation.""" if not os.path.exists(self.plugin_dir): os.makedirs(self.plugin_dir, exist_ok=True) return + + # Ensure quarantine directory exists + os.makedirs(self.quarantine_dir, exist_ok=True) # Look for Python files in the plugins directory plugin_files = glob.glob(os.path.join(self.plugin_dir, "*.py")) loaded_count = 0 failed_count = 0 + quarantined_count = 0 for plugin_file in plugin_files: + plugin_name = os.path.basename(plugin_file)[:-3] + try: - # Get plugin name - plugin_name = os.path.basename(plugin_file)[:-3] - - # Read and execute plugin file + # Read plugin file with open(plugin_file, 'r') as f: plugin_code = f.read() - # Create a namespace for the plugin - plugin_namespace = {} - exec(plugin_code, plugin_namespace) + # Step 1: AST validation - check for top-level code + try: + self._validate_plugin_ast(plugin_code, plugin_name) + except ValueError as e: + # Quarantine the plugin + self._move_to_quarantine(plugin_file, f"AST validation failed: {e}") + quarantined_count += 1 + warning_msg = ( + f"{self.COLOR_ERROR}🚨 SECURITY WARNING: Plugin '{plugin_name}' has been quarantined!{self.COLOR_RESET}\n" + f" Reason: {e}\n" + f" The plugin attempted unsafe operations and has been disabled.\n" + f" Check {self.quarantine_dir} for details.\n" + ) + # Store warning to display after banner + self.quarantine_warnings.append(warning_msg) + continue - # Look for register_command function - if 'register_commands' in plugin_namespace: - plugin_commands = plugin_namespace['register_commands']() - if isinstance(plugin_commands, dict): - self.commands.update(plugin_commands) - self.plugin_command_names.update(plugin_commands.keys()) - loaded_count += 1 - else: - raise ValueError("register_commands() must return a dictionary") - else: + # Step 2: Sandboxed execution + # Create a restricted namespace (sandbox) + safe_builtins = { + # Only allow safe builtins + 'len': len, 'str': str, 'int': int, 'float': float, + 'bool': bool, 'list': list, 'dict': dict, 'tuple': tuple, + 'set': set, 'frozenset': frozenset, 'range': range, + 'enumerate': enumerate, 'zip': zip, 'map': map, 'filter': filter, + 'sorted': sorted, 'reversed': reversed, 'min': min, 'max': max, + 'sum': sum, 'abs': abs, 'round': round, 'any': any, 'all': all, + 'isinstance': isinstance, 'type': type, 'hasattr': hasattr, + 'getattr': getattr, 'setattr': setattr, 'delattr': delattr, + 'callable': callable, 'print': print, 'repr': repr, + # Exception classes (safe, required for normal Python code) + 'BaseException': BaseException, + 'Exception': Exception, + 'ImportError': ImportError, + 'NameError': NameError, + 'ValueError': ValueError, + 'TypeError': TypeError, + 'RuntimeError': RuntimeError, + } + sandbox = { + '__builtins__': safe_builtins + } + + # Execute plugin in sandbox + try: + exec(plugin_code, sandbox) + except Exception as e: + failed_count += 1 + logging.error(f"Failed to execute plugin '{plugin_name}': {str(e)}") + logging.error(f"Plugin file: {plugin_file}") + continue + + # Step 3: Check for register_commands function + if 'register_commands' not in sandbox: raise ValueError("Plugin missing register_commands() function") + + # Step 4: Call register_commands and validate return value + try: + plugin_commands = sandbox['register_commands']() + except Exception as e: + failed_count += 1 + logging.error(f"register_commands() failed for plugin '{plugin_name}': {str(e)}") + continue + + if not isinstance(plugin_commands, dict): + raise ValueError("register_commands() must return a dictionary") + + # Step 5: Validate commands (protected names and callables) + try: + self._validate_plugin_commands(plugin_commands, plugin_name) + except ValueError as e: + # Quarantine the plugin + self._move_to_quarantine(plugin_file, f"Command validation failed: {e}") + quarantined_count += 1 + warning_msg = ( + f"{self.COLOR_ERROR}🚨 SECURITY WARNING: Plugin '{plugin_name}' has been quarantined!{self.COLOR_RESET}\n" + f" Reason: {e}\n" + f" The plugin attempted to override protected commands and has been disabled.\n" + f" Check {self.quarantine_dir} for details.\n" + ) + # Store warning to display after banner + self.quarantine_warnings.append(warning_msg) + continue + + # Step 6: All checks passed - register the commands + self.commands.update(plugin_commands) + self.plugin_command_names.update(plugin_commands.keys()) + loaded_count += 1 except Exception as e: failed_count += 1 - # Log error instead of printing logging.error(f"Failed to load plugin '{plugin_name}': {str(e)}") logging.error(f"Plugin file: {plugin_file}") - # Show brief status if there were failures + # Store summary warning if any plugins were quarantined + if quarantined_count > 0: + summary_warning = ( + f"{self.COLOR_ERROR}🚨 {quarantined_count} plugin(s) quarantined due to security violations!{self.COLOR_RESET}\n" + f" Check {self.quarantine_dir} for quarantined plugins.\n" + ) + self.quarantine_warnings.append(summary_warning) + + # Note: Individual warnings and summary are stored in self.quarantine_warnings + # They will be displayed after the banner in run() method if failed_count > 0: print(f"{self.COLOR_WARNING}⚠ {failed_count} plugin(s) failed to load. Check ~/.zdtt/plugin_errors.log{self.COLOR_RESET}") @@ -1070,8 +1253,16 @@ ZDTT Terminal v{self.version} self.aliases.clear() self.load_aliases() + # Clear previous warnings + self.quarantine_warnings = [] # Reload plugins self.load_plugins() + # Display any new quarantine warnings + if self.quarantine_warnings: + print() + for warning in self.quarantine_warnings: + print(warning) + print() print(f"{self.COLOR_BRIGHT_GREEN}✓ Plugins reloaded successfully!{self.COLOR_RESET}") print() return @@ -1838,6 +2029,13 @@ ZDTT Terminal v{self.version} self.initialize_status_bar() self.display_banner() + # Display security warnings for quarantined plugins (if any) + if self.quarantine_warnings: + print() + for warning in self.quarantine_warnings: + print(warning) + print() + # Main command loop try: while self.running: @@ -1854,10 +2052,19 @@ ZDTT Terminal v{self.version} def main(): + # Parse command line arguments + safe_mode = '--safe' in sys.argv + # Check system compatibility distro = check_system_compatibility() terminal = ZDTTTerminal(distro=distro) + terminal.safe_mode = safe_mode + + if safe_mode: + print(f"{terminal.COLOR_WARNING}⚠ Safe mode enabled - plugins will not be loaded{terminal.COLOR_RESET}") + print() + terminal.run() diff --git a/version.txt b/version.txt index 73240f2..3020f0c 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -0.1.2.a.5 \ No newline at end of file +0.1.2.a.6 \ No newline at end of file