Update to version v0.1.2.a.6 and enhance plugin security in terminal.py

- Updated index.html and version.txt to reflect the new release version v0.1.2.a.6.
- Introduced security measures for plugin loading in terminal.py, including AST validation and quarantine for unsafe plugins.
- Added a safe mode option to prevent plugin loading, enhancing overall security.
This commit is contained in:
2025-11-15 19:36:21 -05:00
parent edb1ada059
commit 16970b0866
3 changed files with 231 additions and 24 deletions

View File

@@ -46,7 +46,7 @@
<div class="hero__metrics"> <div class="hero__metrics">
<div class="metric"> <div class="metric">
<p class="metric__label">Current release</p> <p class="metric__label">Current release</p>
<p class="metric__value">v0.1.2.a.5</p> <p class="metric__value">v0.1.2.a.6</p>
</div> </div>
<div class="metric"> <div class="metric">
<p class="metric__label">Supported families</p> <p class="metric__label">Supported families</p>

View File

@@ -17,6 +17,7 @@ import threading
import json import json
import shlex import shlex
import signal import signal
import ast
from datetime import datetime from datetime import datetime
import urllib.request import urllib.request
import urllib.error import urllib.error
@@ -67,6 +68,12 @@ STATUS_BAR_COLORS = {
'black': ('40', '97'), 'black': ('40', '97'),
} }
# Protected command names that plugins cannot override
PROTECTED_COMMANDS = {
'ssh', 'sudo', 'su', 'cp', 'mv', 'rm', 'ls', 'cat', 'chmod', 'chown',
'history', 'zps', 'zdtt', 'pip', 'python', 'python3', 'curl', 'wget'
}
def _parse_os_release(): def _parse_os_release():
"""Return a dict of fields from /etc/os-release if available""" """Return a dict of fields from /etc/os-release if available"""
@@ -251,6 +258,7 @@ class ZDTTTerminal:
self.zdtt_dir = os.path.expanduser("~/.zdtt") self.zdtt_dir = os.path.expanduser("~/.zdtt")
self.history_file = os.path.expanduser("~/.zdtt_history") self.history_file = os.path.expanduser("~/.zdtt_history")
self.plugin_dir = os.path.join(self.zdtt_dir, "plugins") self.plugin_dir = os.path.join(self.zdtt_dir, "plugins")
self.quarantine_dir = os.path.join(self.zdtt_dir, "quarantine")
self.log_file = os.path.join(self.zdtt_dir, "plugin_errors.log") self.log_file = os.path.join(self.zdtt_dir, "plugin_errors.log")
self.banner_file = os.path.join(self.zdtt_dir, "banner.txt") self.banner_file = os.path.join(self.zdtt_dir, "banner.txt")
self.aliases_file = os.path.join(self.zdtt_dir, "aliases") self.aliases_file = os.path.join(self.zdtt_dir, "aliases")
@@ -262,6 +270,8 @@ class ZDTTTerminal:
self.plugin_command_names = set() self.plugin_command_names = set()
self.update_check_thread = None self.update_check_thread = None
self.resize_lock = threading.Lock() # Lock for resize operations self.resize_lock = threading.Lock() # Lock for resize operations
self.safe_mode = False # Safe mode flag (no plugins loaded)
self.quarantine_warnings = [] # Store warnings for plugins quarantined at startup
# Setup logging for plugins # Setup logging for plugins
self.setup_logging() self.setup_logging()
@@ -362,8 +372,9 @@ class ZDTTTerminal:
# Setup readline history and tab completion # Setup readline history and tab completion
self.setup_readline() self.setup_readline()
# Load plugins # Load plugins (unless in safe mode)
self.load_plugins() if not self.safe_mode:
self.load_plugins()
# Kick off async update check # Kick off async update check
self.start_update_check() self.start_update_check()
@@ -781,49 +792,221 @@ ZDTT Terminal v{self.version}
return options[state] return options[state]
return None return None
def _validate_plugin_ast(self, plugin_code, plugin_name):
"""
Validate plugin AST to ensure no top-level code execution.
Only allows: imports, function definitions, class definitions, and docstrings.
"""
try:
tree = ast.parse(plugin_code)
except SyntaxError as e:
raise ValueError(f"Plugin has syntax errors: {e}")
# Check module body directly - this is the most reliable way
if not isinstance(tree, ast.Module):
raise ValueError("Plugin must be a valid Python module")
for stmt in tree.body:
# Allow imports
if isinstance(stmt, (ast.Import, ast.ImportFrom)):
continue
# Allow function definitions
if isinstance(stmt, (ast.FunctionDef, ast.AsyncFunctionDef)):
continue
# Allow class definitions
if isinstance(stmt, ast.ClassDef):
continue
# Allow docstrings (Expr with string constant)
if isinstance(stmt, ast.Expr):
# Check if it's a string literal (docstring)
if isinstance(stmt.value, (ast.Constant, ast.Str)):
# For Python 3.8+, use ast.Constant; for older versions, use ast.Str
if isinstance(stmt.value, ast.Constant):
if isinstance(stmt.value.value, str):
continue
elif isinstance(stmt.value, ast.Str):
continue
# Anything else is forbidden (assignments, function calls, loops, etc.)
raise ValueError(
f"Plugin contains forbidden top-level statement: {stmt.__class__.__name__}. "
"Plugins can only contain imports, functions, classes, and docstrings. "
"No top-level code execution is allowed."
)
return True
def _move_to_quarantine(self, plugin_file, reason):
"""Move a plugin file to quarantine directory and log the reason."""
plugin_name = os.path.basename(plugin_file)
os.makedirs(self.quarantine_dir, exist_ok=True)
# Create unique filename if already exists
quarantine_path = os.path.join(self.quarantine_dir, plugin_name)
counter = 1
while os.path.exists(quarantine_path):
name, ext = os.path.splitext(plugin_name)
quarantine_path = os.path.join(self.quarantine_dir, f"{name}_{counter}{ext}")
counter += 1
try:
shutil.move(plugin_file, quarantine_path)
logging.warning(f"Plugin '{plugin_name}' quarantined: {reason}")
logging.warning(f"Moved to: {quarantine_path}")
return quarantine_path
except Exception as e:
logging.error(f"Failed to quarantine plugin '{plugin_name}': {e}")
return None
def _validate_plugin_commands(self, plugin_commands, plugin_name):
"""Validate that plugin commands don't override protected commands."""
violations = []
for cmd_name in plugin_commands.keys():
if cmd_name in PROTECTED_COMMANDS:
violations.append(cmd_name)
if violations:
raise ValueError(
f"Plugin attempted to override protected commands: {', '.join(violations)}. "
"This is a security violation and the plugin has been quarantined."
)
# Validate that all values are callable
for cmd_name, cmd_func in plugin_commands.items():
if not callable(cmd_func):
raise ValueError(
f"Plugin command '{cmd_name}' is not callable. "
"All commands must be functions."
)
return True
def load_plugins(self): def load_plugins(self):
"""Load plugin commands from the plugins directory""" """Load plugin commands from the plugins directory with security validation."""
if not os.path.exists(self.plugin_dir): if not os.path.exists(self.plugin_dir):
os.makedirs(self.plugin_dir, exist_ok=True) os.makedirs(self.plugin_dir, exist_ok=True)
return return
# Ensure quarantine directory exists
os.makedirs(self.quarantine_dir, exist_ok=True)
# Look for Python files in the plugins directory # Look for Python files in the plugins directory
plugin_files = glob.glob(os.path.join(self.plugin_dir, "*.py")) plugin_files = glob.glob(os.path.join(self.plugin_dir, "*.py"))
loaded_count = 0 loaded_count = 0
failed_count = 0 failed_count = 0
quarantined_count = 0
for plugin_file in plugin_files: for plugin_file in plugin_files:
plugin_name = os.path.basename(plugin_file)[:-3]
try: try:
# Get plugin name # Read plugin file
plugin_name = os.path.basename(plugin_file)[:-3]
# Read and execute plugin file
with open(plugin_file, 'r') as f: with open(plugin_file, 'r') as f:
plugin_code = f.read() plugin_code = f.read()
# Create a namespace for the plugin # Step 1: AST validation - check for top-level code
plugin_namespace = {} try:
exec(plugin_code, plugin_namespace) self._validate_plugin_ast(plugin_code, plugin_name)
except ValueError as e:
# Quarantine the plugin
self._move_to_quarantine(plugin_file, f"AST validation failed: {e}")
quarantined_count += 1
warning_msg = (
f"{self.COLOR_ERROR}🚨 SECURITY WARNING: Plugin '{plugin_name}' has been quarantined!{self.COLOR_RESET}\n"
f" Reason: {e}\n"
f" The plugin attempted unsafe operations and has been disabled.\n"
f" Check {self.quarantine_dir} for details.\n"
)
# Store warning to display after banner
self.quarantine_warnings.append(warning_msg)
continue
# Look for register_command function # Step 2: Sandboxed execution
if 'register_commands' in plugin_namespace: # Create a restricted namespace (sandbox)
plugin_commands = plugin_namespace['register_commands']() safe_builtins = {
if isinstance(plugin_commands, dict): # Only allow safe builtins
self.commands.update(plugin_commands) 'len': len, 'str': str, 'int': int, 'float': float,
self.plugin_command_names.update(plugin_commands.keys()) 'bool': bool, 'list': list, 'dict': dict, 'tuple': tuple,
loaded_count += 1 'set': set, 'frozenset': frozenset, 'range': range,
else: 'enumerate': enumerate, 'zip': zip, 'map': map, 'filter': filter,
raise ValueError("register_commands() must return a dictionary") 'sorted': sorted, 'reversed': reversed, 'min': min, 'max': max,
else: 'sum': sum, 'abs': abs, 'round': round, 'any': any, 'all': all,
'isinstance': isinstance, 'type': type, 'hasattr': hasattr,
'getattr': getattr, 'setattr': setattr, 'delattr': delattr,
'callable': callable, 'print': print, 'repr': repr,
# Exception classes (safe, required for normal Python code)
'BaseException': BaseException,
'Exception': Exception,
'ImportError': ImportError,
'NameError': NameError,
'ValueError': ValueError,
'TypeError': TypeError,
'RuntimeError': RuntimeError,
}
sandbox = {
'__builtins__': safe_builtins
}
# Execute plugin in sandbox
try:
exec(plugin_code, sandbox)
except Exception as e:
failed_count += 1
logging.error(f"Failed to execute plugin '{plugin_name}': {str(e)}")
logging.error(f"Plugin file: {plugin_file}")
continue
# Step 3: Check for register_commands function
if 'register_commands' not in sandbox:
raise ValueError("Plugin missing register_commands() function") raise ValueError("Plugin missing register_commands() function")
# Step 4: Call register_commands and validate return value
try:
plugin_commands = sandbox['register_commands']()
except Exception as e:
failed_count += 1
logging.error(f"register_commands() failed for plugin '{plugin_name}': {str(e)}")
continue
if not isinstance(plugin_commands, dict):
raise ValueError("register_commands() must return a dictionary")
# Step 5: Validate commands (protected names and callables)
try:
self._validate_plugin_commands(plugin_commands, plugin_name)
except ValueError as e:
# Quarantine the plugin
self._move_to_quarantine(plugin_file, f"Command validation failed: {e}")
quarantined_count += 1
warning_msg = (
f"{self.COLOR_ERROR}🚨 SECURITY WARNING: Plugin '{plugin_name}' has been quarantined!{self.COLOR_RESET}\n"
f" Reason: {e}\n"
f" The plugin attempted to override protected commands and has been disabled.\n"
f" Check {self.quarantine_dir} for details.\n"
)
# Store warning to display after banner
self.quarantine_warnings.append(warning_msg)
continue
# Step 6: All checks passed - register the commands
self.commands.update(plugin_commands)
self.plugin_command_names.update(plugin_commands.keys())
loaded_count += 1
except Exception as e: except Exception as e:
failed_count += 1 failed_count += 1
# Log error instead of printing
logging.error(f"Failed to load plugin '{plugin_name}': {str(e)}") logging.error(f"Failed to load plugin '{plugin_name}': {str(e)}")
logging.error(f"Plugin file: {plugin_file}") logging.error(f"Plugin file: {plugin_file}")
# Show brief status if there were failures # Store summary warning if any plugins were quarantined
if quarantined_count > 0:
summary_warning = (
f"{self.COLOR_ERROR}🚨 {quarantined_count} plugin(s) quarantined due to security violations!{self.COLOR_RESET}\n"
f" Check {self.quarantine_dir} for quarantined plugins.\n"
)
self.quarantine_warnings.append(summary_warning)
# Note: Individual warnings and summary are stored in self.quarantine_warnings
# They will be displayed after the banner in run() method
if failed_count > 0: if failed_count > 0:
print(f"{self.COLOR_WARNING}{failed_count} plugin(s) failed to load. Check ~/.zdtt/plugin_errors.log{self.COLOR_RESET}") print(f"{self.COLOR_WARNING}{failed_count} plugin(s) failed to load. Check ~/.zdtt/plugin_errors.log{self.COLOR_RESET}")
@@ -1070,8 +1253,16 @@ ZDTT Terminal v{self.version}
self.aliases.clear() self.aliases.clear()
self.load_aliases() self.load_aliases()
# Clear previous warnings
self.quarantine_warnings = []
# Reload plugins # Reload plugins
self.load_plugins() self.load_plugins()
# Display any new quarantine warnings
if self.quarantine_warnings:
print()
for warning in self.quarantine_warnings:
print(warning)
print()
print(f"{self.COLOR_BRIGHT_GREEN}✓ Plugins reloaded successfully!{self.COLOR_RESET}") print(f"{self.COLOR_BRIGHT_GREEN}✓ Plugins reloaded successfully!{self.COLOR_RESET}")
print() print()
return return
@@ -1838,6 +2029,13 @@ ZDTT Terminal v{self.version}
self.initialize_status_bar() self.initialize_status_bar()
self.display_banner() self.display_banner()
# Display security warnings for quarantined plugins (if any)
if self.quarantine_warnings:
print()
for warning in self.quarantine_warnings:
print(warning)
print()
# Main command loop # Main command loop
try: try:
while self.running: while self.running:
@@ -1854,10 +2052,19 @@ ZDTT Terminal v{self.version}
def main(): def main():
# Parse command line arguments
safe_mode = '--safe' in sys.argv
# Check system compatibility # Check system compatibility
distro = check_system_compatibility() distro = check_system_compatibility()
terminal = ZDTTTerminal(distro=distro) terminal = ZDTTTerminal(distro=distro)
terminal.safe_mode = safe_mode
if safe_mode:
print(f"{terminal.COLOR_WARNING}⚠ Safe mode enabled - plugins will not be loaded{terminal.COLOR_RESET}")
print()
terminal.run() terminal.run()

View File

@@ -1 +1 @@
0.1.2.a.5 0.1.2.a.6