diff --git a/terminal.py b/terminal.py index cc867cf..6a94d71 100644 --- a/terminal.py +++ b/terminal.py @@ -23,39 +23,18 @@ import urllib.request import urllib.error import time as time_module - -SUPPORTED_DEBIAN_IDS = { - 'debian', - 'ubuntu', - 'linuxmint', - 'mint', - 'pop', - 'pop-os', - 'pop_os', - 'elementary', - 'zorin', - 'kali', - 'parrot', - 'mx', - 'mx-linux', - 'deepin', - 'peppermint', - 'raspbian', - 'neon', -} - -SUPPORTED_ARCH_IDS = { - 'arch', - 'archlinux', - 'manjaro', - 'endeavouros', - 'endeavour', - 'arcolinux', - 'garuda', - 'artix', - 'blackarch', - 'chakra', -} +from zdtt.plugins import ( + PROTECTED_COMMANDS as PLUGIN_PROTECTED_COMMANDS, + validate_plugin_ast as plugins_validate_ast, + validate_plugin_commands as plugins_validate_commands, + move_to_quarantine as plugins_move_to_quarantine, +) +from zdtt.config import ( + check_system_compatibility, +) +from zdtt import status_bar as sb +from zdtt.shell import execute_system_command as shell_execute +from zdtt.ui import display_banner as ui_display_banner, get_prompt as ui_get_prompt STATUS_BAR_COLORS = { 'blue': ('44', '97'), @@ -68,182 +47,15 @@ STATUS_BAR_COLORS = { 'black': ('40', '97'), } -# Protected command names that plugins cannot override -PROTECTED_COMMANDS = { - 'ssh', 'sudo', 'su', 'cp', 'mv', 'rm', 'ls', 'cat', 'chmod', 'chown', - 'history', 'zps', 'zdtt', 'pip', 'python', 'python3', 'curl', 'wget' -} +# Protected command names that plugins cannot override (sourced from plugins module) +PROTECTED_COMMANDS = PLUGIN_PROTECTED_COMMANDS + +# Helper for status bar module to get current color codes +def _STATUS_BAR_COLORS_LOOKUP(self): + return STATUS_BAR_COLORS.get(self.status_bar_color, ('44', '97')) -def _parse_os_release(): - """Return a dict of fields from /etc/os-release if available""" - data = {} - try: - with open('/etc/os-release', 'r') as f: - for line in f: - line = line.strip() - if not line or line.startswith('#') or '=' not in line: - continue - key, value = line.split('=', 1) - value = value.strip().strip('"') - data[key] = value - except FileNotFoundError: - pass - return data - - -def _collect_tokens(*values): - """Normalize distro identifiers for comparison""" - tokens = set() - for value in values: - if not value: - continue - normalized = value.replace('"', '').strip().lower() - if not normalized: - continue - # Keep the raw normalized value plus its dashed/underscored variants split - tokens.add(normalized) - delimiters_replaced = normalized.replace('-', ' ').replace('_', ' ') - for part in delimiters_replaced.split(): - if part: - tokens.add(part) - return tokens - - -def _detect_supported_distro(): - """Return distro identifier: 'debian', 'arch', or 'other'""" - if os.path.exists('/etc/debian_version'): - return 'debian' - - arch_markers = ( - '/etc/arch-release', - '/etc/artix-release', - ) - if any(os.path.exists(path) for path in arch_markers): - return 'arch' - - os_release = _parse_os_release() - tokens = _collect_tokens(os_release.get('ID'), os_release.get('ID_LIKE')) - - if tokens & SUPPORTED_DEBIAN_IDS: - return 'debian' - if tokens & SUPPORTED_ARCH_IDS: - return 'arch' - - # Fallback to package manager detection - if shutil.which('apt-get'): - return 'debian' - if shutil.which('pacman'): - return 'arch' - - return 'other' - - -def _prompt_distro_override(detected_distro): - """Allow user to override detected distro.""" - label_map = { - 'debian': "Debian-based", - 'arch': "Arch-based", - 'other': "Unsupported/Other", - } - print("=" * 60) - print(f"Detected distribution: {label_map.get(detected_distro, 'Unknown')}") - print("If this is incorrect, enter one of: debian / arch / other.") - print("Press Enter to accept the detected value.") - override = input("Override distribution (leave blank to keep): ").strip().lower() - - if override in ('debian', 'arch', 'other'): - return override - - if override: - print(f"Unknown override '{override}'. Using detected value.") - - return detected_distro - - -def _load_saved_distro(): - """Load saved distro preference from config file.""" - config_file = os.path.expanduser("~/.zdtt/config.json") - try: - with open(config_file, 'r') as f: - data = json.load(f) - saved_distro = data.get('distro') - if saved_distro in ('debian', 'arch', 'other'): - return saved_distro - except (FileNotFoundError, json.JSONDecodeError, KeyError): - pass - return None - -def _save_distro_preference(distro): - """Save distro preference to config file.""" - config_file = os.path.expanduser("~/.zdtt/config.json") - os.makedirs(os.path.dirname(config_file), exist_ok=True) - - data = {} - try: - with open(config_file, 'r') as f: - data = json.load(f) - except (FileNotFoundError, json.JSONDecodeError): - data = {} - - data['distro'] = distro - - with open(config_file, 'w') as f: - json.dump(data, f, indent=2) - -def check_system_compatibility(): - """Detect supported distributions and warn when unsupported""" - # Check for saved distro preference first - saved_distro = _load_saved_distro() - if saved_distro: - # Use saved preference, skip prompts - return saved_distro - - # Check if running on Linux - if sys.platform != 'linux': - print("=" * 60) - print("⚠️ WARNING: ZDTT Terminal is designed for Linux systems") - print(f" Detected platform: {sys.platform}") - print("=" * 60) - print("ZDTT may not work correctly on your system.") - print("Some features may be unavailable or broken.") - print() - response = input("Continue anyway? (yes/no): ").strip().lower() - if response != 'yes': - print("Installation cancelled.") - sys.exit(0) - distro = 'other' - _save_distro_preference(distro) - return distro - - # Detect supported distributions - distro = _detect_supported_distro() - - if distro not in ('debian', 'arch'): - # Unsupported distribution - print("=" * 60) - print("⚠️ WARNING: Unsupported Distribution Detected") - print("=" * 60) - print("ZDTT Terminal is optimized for Debian-based and Arch Linux systems.") - print() - print("Running on your current system may result in:") - print(" • Some commands may not work as expected") - print(" • Auto-install features may fail") - print(" • Reduced plugin compatibility") - print(" • Package management commands unavailable") - print() - response = input("Continue installation? (yes/no): ").strip().lower() - if response != 'yes': - print("Installation cancelled.") - sys.exit(0) - - # Offer override regardless of detection - distro = _prompt_distro_override(distro) - - # Save the selected distro preference - _save_distro_preference(distro) - - return distro +## moved to zdtt.config class ZDTTTerminal: @@ -273,6 +85,8 @@ class ZDTTTerminal: self.safe_mode = False # Safe mode flag (no plugins loaded) self.quarantine_warnings = [] # Store warnings for plugins quarantined at startup self.trusted_plugins = set() # Plugins allowed to use imports + # Expose STATUS_BAR_COLORS lookup to status bar module + self.STATUS_BAR_COLORS_LOOKUP = lambda: _STATUS_BAR_COLORS_LOOKUP(self) # Setup logging for plugins self.setup_logging() @@ -475,272 +289,49 @@ class ZDTTTerminal: def display_banner(self): """Display the ZDTT ASCII art banner (or custom banner if available)""" - print() - - # Check terminal size to see if banner will fit - try: - term_size = shutil.get_terminal_size() - # Banner is 44 chars wide and 11 lines tall (including version) - # Add extra space for compatibility warning if needed - min_height = 13 if not self.is_supported else 11 - min_width = 44 - - if term_size.columns < min_width or term_size.lines < min_height: - # Terminal too small, skip banner and just show minimal header - print(f"ZDTT Terminal v{self.version}") - if not self.is_supported: - print("⚠️ Unsupported system - limited support") - print() - return - except Exception: - # If we can't get terminal size, display the banner anyway - pass - - # Check for custom banner - if os.path.exists(self.banner_file): - try: - with open(self.banner_file, 'r') as f: - custom_banner = f.read() - # Add version at the bottom if not already present - if '{version}' in custom_banner: - custom_banner = custom_banner.replace('{version}', self.version) - print(custom_banner) - # Show warning for unsupported systems - if not self.is_supported: - self._show_compatibility_warning() - return - except Exception as e: - logging.error(f"Failed to load custom banner: {e}") - # Fall through to default banner - - # Default banner - banner = f""" -░█████████ ░███████ ░██████████░██████████ - ░██ ░██ ░██ ░██ ░██ - ░██ ░██ ░██ ░██ ░██ - ░███ ░██ ░██ ░██ ░██ - ░██ ░██ ░██ ░██ ░██ - ░██ ░██ ░██ ░██ ░██ -░█████████ ░███████ ░██ ░██ - - -ZDTT Terminal v{self.version} -""" - print(banner) - - # Show warning for unsupported systems after banner - if not self.is_supported: - self._show_compatibility_warning() + ui_display_banner(self) def _show_compatibility_warning(self): """Show compatibility warning for unsupported systems""" - if self.is_supported: - return - - print() - print("⚠️ Running on unsupported system - limited support") - print(" Tested on Debian-based and Arch Linux distributions.") - print() + from zdtt.ui import _show_compatibility_warning as ui_warn + ui_warn(self) def initialize_status_bar(self): """Reserve the first terminal row and start the status bar thread.""" - self._set_scroll_region() - self._start_status_bar_thread() - self._render_status_bar() + sb.initialize_status_bar(self) def shutdown_status_bar(self): """Stop the status bar thread and release terminal state.""" - self.status_bar_stop_event.set() - if self.status_bar_thread and self.status_bar_thread.is_alive(): - self.status_bar_thread.join(timeout=0.5) - self.status_bar_thread = None - self._reset_scroll_region() + sb.shutdown_status_bar(self) def _start_status_bar_thread(self): - if self.status_bar_thread and self.status_bar_thread.is_alive(): - return - self.status_bar_stop_event.clear() - self.status_bar_thread = threading.Thread( - target=self._status_bar_loop, - name="ZDTTStatusBar", - daemon=True, - ) - self.status_bar_thread.start() + sb.start_status_bar_thread(self) def _status_bar_loop(self): - while not self.status_bar_stop_event.is_set(): - self._render_status_bar() - if self.status_bar_stop_event.wait(2): - break + sb.status_bar_loop(self) def _render_status_bar(self): """Render a single-line status bar with branding and time.""" - try: - # Get terminal size first to ensure we don't write beyond bounds - try: - term_size = shutil.get_terminal_size() - max_width = term_size.columns - except Exception: - max_width = 80 # Fallback - - bar_text = self._build_status_bar_text() - - # Ensure bar_text doesn't exceed terminal width (safety check) - # Count visible characters (approximate - ANSI codes don't count) - # This is a rough check, but better than nothing - if len(bar_text) > max_width * 3: # Allow for ANSI codes (rough estimate) - # Rebuild with safer width - bar_text = self._build_status_bar_text() - - sys.stdout.write("\033[s") # Save cursor position - sys.stdout.write("\033[1;1H") # Move to first row, first column - sys.stdout.write("\033[2K") # Clear the entire line - sys.stdout.write("\033[0m") # Reset all attributes - sys.stdout.write(bar_text) - sys.stdout.write("\033[0m") # Ensure reset at end - # Move cursor to end of line to prevent wrapping issues - sys.stdout.write(f"\033[{max_width}G") # Move to column max_width - sys.stdout.write("\033[u") # Restore cursor - sys.stdout.flush() - except Exception: - # Fallback: just skip rendering if there's an error - pass + sb.render_status_bar(self) def _build_status_bar_text(self): """Render a single-line status bar with enhanced branding and time.""" - left_text = f"{self.COLOR_BOLD}ZDTT{self.COLOR_RESET} by {self.COLOR_BOLD}ZaneDev{self.COLOR_RESET}" - time_str = datetime.now().strftime("%I:%M %p") - plain_left = "ZDTT by ZaneDev" - plain_time = time_str - - try: - # Always get fresh terminal size to handle resizes - term_size = shutil.get_terminal_size() - width = term_size.columns - # Safety: ensure width is at least 1 - width = max(1, width) - except Exception: - # Fallback to minimum width if we can't get terminal size - width = max(1, len(plain_left) + len(plain_time) + 6) - - # Calculate the minimum content width (plain text only, no ANSI codes) - # Format: " ZDTT by ZaneDev | TIME " - min_content_width = len(plain_left) + len(plain_time) + 5 # 5 = spaces + separator - - # Calculate padding to fill the line - if width < min_content_width: - # Terminal too narrow, use minimum padding - padding = 0 - else: - padding = width - min_content_width - - # Build the content (plain text calculation) - separator = f"{self.COLOR_DIM}│{self.COLOR_RESET}" - bar_content = f" {left_text} {' ' * padding}{separator} {self.COLOR_BRIGHT_WHITE}{time_str}{self.COLOR_RESET} " - - # Calculate actual display length (plain text only) - actual_display_len = len(plain_left) + len(plain_time) + padding + 5 - - # Ensure we fill exactly to terminal width (but never exceed it) - if actual_display_len < width: - # Add trailing spaces to fill exactly to width - trailing_spaces = width - actual_display_len - bar_content = bar_content.rstrip() + ' ' * trailing_spaces - elif actual_display_len > width: - # We exceeded width, recalculate with less padding - padding = max(0, width - min_content_width) - bar_content = f" {left_text} {' ' * padding}{separator} {self.COLOR_BRIGHT_WHITE}{time_str}{self.COLOR_RESET} " - actual_display_len = len(plain_left) + len(plain_time) + padding + 5 - if actual_display_len < width: - trailing_spaces = width - actual_display_len - bar_content = bar_content.rstrip() + ' ' * trailing_spaces - else: - # Still too wide, trim the time if necessary - if width < len(plain_left) + 10: - # Very narrow terminal, just show minimal content - bar_content = f" {left_text} {separator} {self.COLOR_BRIGHT_WHITE}{time_str[:8]}{self.COLOR_RESET} " - bar_content = bar_content[:width] if len(bar_content) > width else bar_content - - # Final safety check: ensure we never exceed terminal width - # This is approximate since ANSI codes don't count, but better than nothing - bg_code, fg_code = STATUS_BAR_COLORS.get(self.status_bar_color, ('44', '97')) - result = f"\033[{bg_code}m\033[{fg_code}m{bar_content}\033[0m" - - # If the result is suspiciously long, truncate it - # (rough heuristic: ANSI codes add ~30-50 chars, so if result > width*2, it's probably wrong) - if len(result) > width * 2: - # Emergency fallback: simple status bar - simple_bar = f" ZDTT by ZaneDev | {time_str} " - simple_bar = simple_bar[:width] if len(simple_bar) > width else simple_bar.ljust(width) - result = f"\033[{bg_code}m\033[{fg_code}m{simple_bar}\033[0m" - - return result + return sb.build_status_bar_text(self) def _set_scroll_region(self): """Reserve the top row for the status bar.""" - try: - rows = shutil.get_terminal_size().lines - rows = max(rows, 2) - sys.stdout.write(f"\033[2;{rows}r") - sys.stdout.write("\033[1;1H") - sys.stdout.write("\033[2K") - sys.stdout.write("\033[2;1H") - sys.stdout.flush() - self.scroll_region_set = True - except Exception: - self.scroll_region_set = False + sb.set_scroll_region(self) def _reset_scroll_region(self): """Restore default scrolling behavior.""" - if not self.scroll_region_set: - return - sys.stdout.write("\033[r") - sys.stdout.flush() - self.scroll_region_set = False + sb.reset_scroll_region(self) def _handle_resize(self, signum=None, frame=None): """Handle terminal resize event (SIGWINCH).""" - # Use a lock to prevent race conditions - if not self.resize_lock.acquire(blocking=False): - # If we can't acquire the lock immediately, skip this resize - # (another resize is already being handled) - return - - try: - # Small delay to let terminal settle after resize - time_module.sleep(0.05) - - # Reset scroll region first to clear any corrupted state - self._reset_scroll_region() - - # Update scroll region with new terminal size - self._set_scroll_region() - - # Clear the status bar line completely before redrawing - try: - sys.stdout.write("\033[1;1H") # Move to first row - sys.stdout.write("\033[2K") # Clear the entire line - sys.stdout.write("\033[0m") # Reset attributes - sys.stdout.flush() - except Exception: - pass - - # Force immediate status bar refresh - self._render_status_bar() - - # Ensure cursor is in a safe position - try: - term_size = shutil.get_terminal_size() - sys.stdout.write(f"\033[{term_size.lines};1H") # Move to last line, first column - sys.stdout.flush() - except Exception: - pass - - except Exception: - # Silently fail if resize handling fails - pass - finally: - self.resize_lock.release() + sb.handle_resize(self, signum, frame) + + def _spawn_thread(self, target, name): + return threading.Thread(target=target, name=name, daemon=True) def setup_readline(self): """Setup readline for history and tab completion""" @@ -800,92 +391,19 @@ ZDTT Terminal v{self.version} return None def _validate_plugin_ast(self, plugin_code, plugin_name): - """ - Validate plugin AST to ensure no top-level code execution. - Only allows: imports, function definitions, class definitions, and docstrings. - """ - try: - tree = ast.parse(plugin_code) - except SyntaxError as e: - raise ValueError(f"Plugin has syntax errors: {e}") - - # Check module body directly - this is the most reliable way - if not isinstance(tree, ast.Module): - raise ValueError("Plugin must be a valid Python module") - - for stmt in tree.body: - # Allow imports - if isinstance(stmt, (ast.Import, ast.ImportFrom)): - continue - # Allow function definitions - if isinstance(stmt, (ast.FunctionDef, ast.AsyncFunctionDef)): - continue - # Allow class definitions - if isinstance(stmt, ast.ClassDef): - continue - # Allow docstrings (Expr with string constant) - if isinstance(stmt, ast.Expr): - # Check if it's a string literal (docstring) - if isinstance(stmt.value, (ast.Constant, ast.Str)): - # For Python 3.8+, use ast.Constant; for older versions, use ast.Str - if isinstance(stmt.value, ast.Constant): - if isinstance(stmt.value.value, str): - continue - elif isinstance(stmt.value, ast.Str): - continue - # Anything else is forbidden (assignments, function calls, loops, etc.) - raise ValueError( - f"Plugin contains forbidden top-level statement: {stmt.__class__.__name__}. " - "Plugins can only contain imports, functions, classes, and docstrings. " - "No top-level code execution is allowed." - ) - - return True + # Delegate to plugins module + return plugins_validate_ast(plugin_code, plugin_name) def _move_to_quarantine(self, plugin_file, reason): - """Move a plugin file to quarantine directory and log the reason.""" - plugin_name = os.path.basename(plugin_file) - os.makedirs(self.quarantine_dir, exist_ok=True) - - # Create unique filename if already exists - quarantine_path = os.path.join(self.quarantine_dir, plugin_name) - counter = 1 - while os.path.exists(quarantine_path): - name, ext = os.path.splitext(plugin_name) - quarantine_path = os.path.join(self.quarantine_dir, f"{name}_{counter}{ext}") - counter += 1 - - try: - shutil.move(plugin_file, quarantine_path) - logging.warning(f"Plugin '{plugin_name}' quarantined: {reason}") - logging.warning(f"Moved to: {quarantine_path}") - return quarantine_path - except Exception as e: - logging.error(f"Failed to quarantine plugin '{plugin_name}': {e}") - return None + # Delegate to plugins module + path = plugins_move_to_quarantine(plugin_file, self.quarantine_dir, logging) + if path: + logging.warning(f"Reason: {reason}") + return path def _validate_plugin_commands(self, plugin_commands, plugin_name): - """Validate that plugin commands don't override protected commands.""" - violations = [] - for cmd_name in plugin_commands.keys(): - if cmd_name in PROTECTED_COMMANDS: - violations.append(cmd_name) - - if violations: - raise ValueError( - f"Plugin attempted to override protected commands: {', '.join(violations)}. " - "This is a security violation and the plugin has been quarantined." - ) - - # Validate that all values are callable - for cmd_name, cmd_func in plugin_commands.items(): - if not callable(cmd_func): - raise ValueError( - f"Plugin command '{cmd_name}' is not callable. " - "All commands must be functions." - ) - - return True + # Delegate to plugins module + return plugins_validate_commands(plugin_commands, plugin_name, PROTECTED_COMMANDS) def load_plugins(self): """Load plugin commands from the plugins directory with security validation.""" @@ -1128,32 +646,7 @@ ZDTT Terminal v{self.version} def get_prompt(self): """Return the custom prompt string with enhanced colors""" - # Show current directory in prompt - cwd = os.getcwd() - # Show ~ for home directory - home = os.path.expanduser("~") - if cwd.startswith(home): - display_path = "~" + cwd[len(home):] - else: - display_path = cwd - - # Wrap ANSI codes in \001 and \002 so readline knows they're non-printable - # This fixes line wrapping issues with long commands - RL_PROMPT_START = '\001' - RL_PROMPT_END = '\002' - - # Create enhanced colorized prompt with gradient-like effect - # [username in bright green @ ZDTT in bright cyan path in bright blue]=> - prompt = (f"{RL_PROMPT_START}{self.COLOR_BRIGHT_CYAN}{RL_PROMPT_END}┌─{RL_PROMPT_START}{self.COLOR_RESET}{RL_PROMPT_END}" - f"[{RL_PROMPT_START}{self.COLOR_BRIGHT_GREEN}{RL_PROMPT_END}{self.username}" - f"{RL_PROMPT_START}{self.COLOR_RESET}{RL_PROMPT_END}" - f"{RL_PROMPT_START}{self.COLOR_BRIGHT_WHITE}{RL_PROMPT_END}@{RL_PROMPT_START}{self.COLOR_RESET}{RL_PROMPT_END}" - f"{RL_PROMPT_START}{self.COLOR_BRIGHT_CYAN}{RL_PROMPT_END}ZDTT{RL_PROMPT_START}{self.COLOR_RESET}{RL_PROMPT_END} " - f"{RL_PROMPT_START}{self.COLOR_BRIGHT_BLUE}{RL_PROMPT_END}{display_path}" - f"{RL_PROMPT_START}{self.COLOR_RESET}{RL_PROMPT_END}]" - f"{RL_PROMPT_START}{self.COLOR_BRIGHT_CYAN}{RL_PROMPT_END}─{RL_PROMPT_START}{self.COLOR_RESET}{RL_PROMPT_END}\n" - f"{RL_PROMPT_START}{self.COLOR_BRIGHT_CYAN}{RL_PROMPT_END}└─{RL_PROMPT_START}{self.COLOR_BRIGHT_MAGENTA}{RL_PROMPT_END}➜{RL_PROMPT_START}{self.COLOR_RESET}{RL_PROMPT_END} ") - return prompt + return ui_get_prompt(self) def cmd_help(self, args): """Display available commands with enhanced formatting""" @@ -1949,93 +1442,7 @@ ZDTT Terminal v{self.version} def _execute_system_command(self, command): """Execute a system command with real-time I/O streaming.""" - # Temporarily disable status bar updates during command execution - status_bar_was_running = self.status_bar_thread and self.status_bar_thread.is_alive() - - try: - # Start the process with direct stdin/stdout/stderr - process = subprocess.Popen( - command, - shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, # Merge stderr into stdout - stdin=sys.stdin, # Direct stdin passthrough - bufsize=1, # Line buffered - text=True, - cwd=self.current_dir - ) - - # Buffer for early output detection - early_output = [] - start_time = time_module.time() - check_timeout = 0.1 # 0.1 seconds - hide_output = False - output_buffer = [] - - # Read output in real-time - try: - while True: - # Read character by character for early detection - char = process.stdout.read(1) - if not char: - if process.poll() is not None: - break - time_module.sleep(0.01) - continue - - # Check for "command not found" in first 0.1 seconds - if time_module.time() - start_time < check_timeout: - early_output.append(char) - combined = ''.join(early_output).lower() - if 'command not found' in combined or 'not found:' in combined: - hide_output = True - # Consume remaining output silently - while process.poll() is None: - process.stdout.read(1) - break - - # Buffer output - output_buffer.append(char) - - # If we have a complete line or enough chars, flush - if char == '\n' or len(output_buffer) >= 1024: - if not hide_output: - sys.stdout.write(''.join(output_buffer)) - sys.stdout.flush() - output_buffer.clear() - - # Flush remaining buffer - if output_buffer and not hide_output: - sys.stdout.write(''.join(output_buffer)) - sys.stdout.flush() - - # Wait for process to finish - process.wait() - - except BrokenPipeError: - # Process closed stdout - pass - - except KeyboardInterrupt: - # Handle Ctrl+C - try: - if 'process' in locals(): - process.terminate() - process.wait(timeout=1) - except Exception: - try: - if 'process' in locals(): - process.kill() - except Exception: - pass - print("\n^C") - except Exception as e: - if not hide_output: - print(f"{self.COLOR_ERROR}Error executing command: {e}{self.COLOR_RESET}") - finally: - # Restore status bar if it was running - if status_bar_was_running: - self._render_status_bar() + shell_execute(self, command) def execute_command(self, command_line): """Parse and execute a command""" diff --git a/zdtt/__init__.py b/zdtt/__init__.py new file mode 100644 index 0000000..dc54eac --- /dev/null +++ b/zdtt/__init__.py @@ -0,0 +1,4 @@ +# ZDTT package + + + diff --git a/zdtt/__pycache__/__init__.cpython-312.pyc b/zdtt/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..a5d8625 Binary files /dev/null and b/zdtt/__pycache__/__init__.cpython-312.pyc differ diff --git a/zdtt/__pycache__/config.cpython-312.pyc b/zdtt/__pycache__/config.cpython-312.pyc new file mode 100644 index 0000000..d8701e9 Binary files /dev/null and b/zdtt/__pycache__/config.cpython-312.pyc differ diff --git a/zdtt/__pycache__/plugins.cpython-312.pyc b/zdtt/__pycache__/plugins.cpython-312.pyc new file mode 100644 index 0000000..c60fabc Binary files /dev/null and b/zdtt/__pycache__/plugins.cpython-312.pyc differ diff --git a/zdtt/__pycache__/shell.cpython-312.pyc b/zdtt/__pycache__/shell.cpython-312.pyc new file mode 100644 index 0000000..a739182 Binary files /dev/null and b/zdtt/__pycache__/shell.cpython-312.pyc differ diff --git a/zdtt/__pycache__/status_bar.cpython-312.pyc b/zdtt/__pycache__/status_bar.cpython-312.pyc new file mode 100644 index 0000000..c4fe83e Binary files /dev/null and b/zdtt/__pycache__/status_bar.cpython-312.pyc differ diff --git a/zdtt/__pycache__/ui.cpython-312.pyc b/zdtt/__pycache__/ui.cpython-312.pyc new file mode 100644 index 0000000..5242472 Binary files /dev/null and b/zdtt/__pycache__/ui.cpython-312.pyc differ diff --git a/zdtt/config.py b/zdtt/config.py new file mode 100644 index 0000000..b1a1575 --- /dev/null +++ b/zdtt/config.py @@ -0,0 +1,166 @@ +""" +System detection and persistent configuration helpers for ZDTT. +""" +import os +import sys +import shutil +import json + +SUPPORTED_DEBIAN_IDS = { + 'debian', 'ubuntu', 'linuxmint', 'mint', 'pop', 'pop-os', 'pop_os', + 'elementary', 'zorin', 'kali', 'parrot', 'mx', 'mx-linux', 'deepin', + 'peppermint', 'raspbian', 'neon', +} + +SUPPORTED_ARCH_IDS = { + 'arch', 'archlinux', 'manjaro', 'endeavouros', 'endeavour', 'arcolinux', + 'garuda', 'artix', 'blackarch', 'chakra', +} + + +def _parse_os_release(): + data = {} + try: + with open('/etc/os-release', 'r') as f: + for line in f: + line = line.strip() + if not line or line.startswith('#') or '=' not in line: + continue + key, value = line.split('=', 1) + value = value.strip().strip('"') + data[key] = value + except FileNotFoundError: + pass + return data + + +def _collect_tokens(*values): + tokens = set() + for value in values: + if not value: + continue + normalized = value.replace('"', '').strip().lower() + if not normalized: + continue + tokens.add(normalized) + delimiters_replaced = normalized.replace('-', ' ').replace('_', ' ') + for part in delimiters_replaced.split(): + if part: + tokens.add(part) + return tokens + + +def _detect_supported_distro(): + if os.path.exists('/etc/debian_version'): + return 'debian' + + arch_markers = ('/etc/arch-release', '/etc/artix-release') + if any(os.path.exists(path) for path in arch_markers): + return 'arch' + + os_release = _parse_os_release() + tokens = _collect_tokens(os_release.get('ID'), os_release.get('ID_LIKE')) + + if tokens & SUPPORTED_DEBIAN_IDS: + return 'debian' + if tokens & SUPPORTED_ARCH_IDS: + return 'arch' + + if shutil.which('apt-get'): + return 'debian' + if shutil.which('pacman'): + return 'arch' + return 'other' + + +def _prompt_distro_override(detected_distro): + label_map = { + 'debian': "Debian-based", + 'arch': "Arch-based", + 'other': "Unsupported/Other", + } + print("=" * 60) + print(f"Detected distribution: {label_map.get(detected_distro, 'Unknown')}") + print("If this is incorrect, enter one of: debian / arch / other.") + print("Press Enter to accept the detected value.") + override = input("Override distribution (leave blank to keep): ").strip().lower() + if override in ('debian', 'arch', 'other'): + return override + if override: + print(f"Unknown override '{override}'. Using detected value.") + return detected_distro + + +def _load_saved_distro(): + config_file = os.path.expanduser("~/.zdtt/config.json") + try: + with open(config_file, 'r') as f: + data = json.load(f) + saved_distro = data.get('distro') + if saved_distro in ('debian', 'arch', 'other'): + return saved_distro + except (FileNotFoundError, json.JSONDecodeError, KeyError): + pass + return None + + +def _save_distro_preference(distro: str): + config_file = os.path.expanduser("~/.zdtt/config.json") + os.makedirs(os.path.dirname(config_file), exist_ok=True) + data = {} + try: + with open(config_file, 'r') as f: + data = json.load(f) + except (FileNotFoundError, json.JSONDecodeError): + data = {} + data['distro'] = distro + with open(config_file, 'w') as f: + json.dump(data, f, indent=2) + + +def check_system_compatibility(): + """Detect supported distributions and warn when unsupported. Returns 'debian' | 'arch' | 'other'.""" + saved_distro = _load_saved_distro() + if saved_distro: + return saved_distro + + if sys.platform != 'linux': + print("=" * 60) + print("⚠️ WARNING: ZDTT Terminal is designed for Linux systems") + print(f" Detected platform: {sys.platform}") + print("=" * 60) + print("ZDTT may not work correctly on your system.") + print("Some features may be unavailable or broken.") + print() + response = input("Continue anyway? (yes/no): ").strip().lower() + if response != 'yes': + print("Installation cancelled.") + sys.exit(0) + distro = 'other' + _save_distro_preference(distro) + return distro + + distro = _detect_supported_distro() + if distro not in ('debian', 'arch'): + print("=" * 60) + print("⚠️ WARNING: Unsupported Distribution Detected") + print("=" * 60) + print("ZDTT Terminal is optimized for Debian-based and Arch Linux systems.") + print() + print("Running on your current system may result in:") + print(" • Some commands may not work as expected") + print(" • Auto-install features may fail") + print(" • Reduced plugin compatibility") + print(" • Package management commands unavailable") + print() + response = input("Continue installation? (yes/no): ").strip().lower() + if response != 'yes': + print("Installation cancelled.") + sys.exit(0) + + distro = _prompt_distro_override(distro) + _save_distro_preference(distro) + return distro + + + diff --git a/zdtt/plugins.py b/zdtt/plugins.py new file mode 100644 index 0000000..1fb9e3e --- /dev/null +++ b/zdtt/plugins.py @@ -0,0 +1,106 @@ +""" +Plugin utilities for ZDTT: AST validation, quarantine, and command validation. +""" + +import os +import shutil +import ast +from typing import Dict, Callable, Iterable, Optional + +# Protected command names that plugins cannot override +PROTECTED_COMMANDS = { + 'ssh', 'sudo', 'su', 'cp', 'mv', 'rm', 'ls', 'cat', 'chmod', 'chown', + 'history', 'zps', 'zdtt', 'pip', 'python', 'python3', 'curl', 'wget' +} + + +def validate_plugin_ast(plugin_code: str, plugin_name: str) -> bool: + """ + Validate plugin AST to ensure no top-level code execution. + Only allows: imports, function definitions, class definitions, and docstrings. + Raises ValueError on violation. + """ + try: + tree = ast.parse(plugin_code) + except SyntaxError as e: + raise ValueError(f"Plugin has syntax errors: {e}") + + if not isinstance(tree, ast.Module): + raise ValueError("Plugin must be a valid Python module") + + for stmt in tree.body: + if isinstance(stmt, (ast.Import, ast.ImportFrom)): + continue + if isinstance(stmt, (ast.FunctionDef, ast.AsyncFunctionDef)): + continue + if isinstance(stmt, ast.ClassDef): + continue + if isinstance(stmt, ast.Expr): + # Allow docstring literals + if isinstance(stmt.value, (ast.Constant, ast.Str)): + if isinstance(stmt.value, ast.Constant): + if isinstance(stmt.value.value, str): + continue + else: + # ast.Str case (older Python) + continue + raise ValueError( + f"Plugin contains forbidden top-level statement: {stmt.__class__.__name__}. " + "Plugins can only contain imports, functions, classes, and docstrings. " + "No top-level code execution is allowed." + ) + + return True + + +def move_to_quarantine(plugin_file: str, quarantine_dir: str, logger) -> Optional[str]: + """ + Move a plugin file to quarantine directory and log the reason via caller. + Returns the final quarantine path or None on failure. + """ + plugin_name = os.path.basename(plugin_file) + os.makedirs(quarantine_dir, exist_ok=True) + + quarantine_path = os.path.join(quarantine_dir, plugin_name) + counter = 1 + while os.path.exists(quarantine_path): + name, ext = os.path.splitext(plugin_name) + quarantine_path = os.path.join(quarantine_dir, f"{name}_{counter}{ext}") + counter += 1 + + try: + shutil.move(plugin_file, quarantine_path) + logger.warning(f"Plugin '{plugin_name}' quarantined") + logger.warning(f"Moved to: {quarantine_path}") + return quarantine_path + except Exception as e: + logger.error(f"Failed to quarantine plugin '{plugin_name}': {e}") + return None + + +def validate_plugin_commands( + plugin_commands: Dict[str, Callable], + plugin_name: str, + protected_commands: Iterable[str] = PROTECTED_COMMANDS, +) -> bool: + """ + Ensure plugins do not override protected commands and values are callable. + Raises ValueError on violation. + """ + violations = [cmd for cmd in plugin_commands.keys() if cmd in protected_commands] + if violations: + raise ValueError( + f"Plugin attempted to override protected commands: {', '.join(violations)}. " + "This is a security violation and the plugin has been quarantined." + ) + + for cmd_name, cmd_func in plugin_commands.items(): + if not callable(cmd_func): + raise ValueError( + f"Plugin command '{cmd_name}' is not callable. All commands must be functions." + ) + + return True + + + diff --git a/zdtt/shell.py b/zdtt/shell.py new file mode 100644 index 0000000..f354f06 --- /dev/null +++ b/zdtt/shell.py @@ -0,0 +1,82 @@ +""" +System shell command execution utilities for ZDTT. +""" +import sys +import subprocess +import time as time_module + + +def execute_system_command(terminal, command: str): + """Execute a system command with real-time I/O streaming.""" + status_bar_was_running = terminal.status_bar_thread and terminal.status_bar_thread.is_alive() + try: + process = subprocess.Popen( + command, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + stdin=sys.stdin, + bufsize=1, + text=True, + cwd=terminal.current_dir + ) + + early_output = [] + start_time = time_module.time() + check_timeout = 0.1 + hide_output = False + output_buffer = [] + + try: + while True: + char = process.stdout.read(1) + if not char: + if process.poll() is not None: + break + time_module.sleep(0.01) + continue + + if time_module.time() - start_time < check_timeout: + early_output.append(char) + combined = ''.join(early_output).lower() + if 'command not found' in combined or 'not found:' in combined: + hide_output = True + while process.poll() is None: + process.stdout.read(1) + break + + output_buffer.append(char) + if char == '\n' or len(output_buffer) >= 1024: + if not hide_output: + sys.stdout.write(''.join(output_buffer)) + sys.stdout.flush() + output_buffer.clear() + + if output_buffer and not hide_output: + sys.stdout.write(''.join(output_buffer)) + sys.stdout.flush() + + process.wait() + except BrokenPipeError: + pass + except KeyboardInterrupt: + try: + if 'process' in locals(): + process.terminate() + process.wait(timeout=1) + except Exception: + try: + if 'process' in locals(): + process.kill() + except Exception: + pass + print("\n^C") + except Exception as e: + if not locals().get('hide_output', False): + print(f"{terminal.COLOR_ERROR}Error executing command: {e}{terminal.COLOR_RESET}") + finally: + if status_bar_was_running: + terminal._render_status_bar() + + + diff --git a/zdtt/status_bar.py b/zdtt/status_bar.py new file mode 100644 index 0000000..11625b5 --- /dev/null +++ b/zdtt/status_bar.py @@ -0,0 +1,154 @@ +""" +Status bar, scroll region, and resize handling utilities for ZDTT. +All functions operate on the provided terminal instance. +""" +import sys +import shutil +from datetime import datetime + + +def set_scroll_region(terminal): + try: + rows = shutil.get_terminal_size().lines + rows = max(rows, 2) + sys.stdout.write(f"\033[2;{rows}r") + sys.stdout.write("\033[1;1H") + sys.stdout.write("\033[2K") + sys.stdout.write("\033[2;1H") + sys.stdout.flush() + terminal.scroll_region_set = True + except Exception: + terminal.scroll_region_set = False + + +def reset_scroll_region(terminal): + if not terminal.scroll_region_set: + return + sys.stdout.write("\033[r") + sys.stdout.flush() + terminal.scroll_region_set = False + + +def build_status_bar_text(terminal): + left_text = f"{terminal.COLOR_BOLD}ZDTT{terminal.COLOR_RESET} by {terminal.COLOR_BOLD}ZaneDev{terminal.COLOR_RESET}" + time_str = datetime.now().strftime("%I:%M %p") + plain_left = "ZDTT by ZaneDev" + plain_time = time_str + + try: + term_size = shutil.get_terminal_size() + width = max(1, term_size.columns) + except Exception: + width = max(1, len(plain_left) + len(plain_time) + 6) + + min_content_width = len(plain_left) + len(plain_time) + 5 + padding = 0 if width < min_content_width else width - min_content_width + separator = f"{terminal.COLOR_DIM}│{terminal.COLOR_RESET}" + bar_content = f" {left_text} {' ' * padding}{separator} {terminal.COLOR_BRIGHT_WHITE}{time_str}{terminal.COLOR_RESET} " + actual_display_len = len(plain_left) + len(plain_time) + padding + 5 + + if actual_display_len < width: + trailing_spaces = width - actual_display_len + bar_content = bar_content.rstrip() + ' ' * trailing_spaces + elif actual_display_len > width: + padding = max(0, width - min_content_width) + bar_content = f" {left_text} {' ' * padding}{separator} {terminal.COLOR_BRIGHT_WHITE}{time_str}{terminal.COLOR_RESET} " + actual_display_len = len(plain_left) + len(plain_time) + padding + 5 + if actual_display_len < width: + trailing_spaces = width - actual_display_len + bar_content = bar_content.rstrip() + ' ' * trailing_spaces + else: + if width < len(plain_left) + 10: + bar_content = f" {left_text} {separator} {terminal.COLOR_BRIGHT_WHITE}{time_str[:8]}{terminal.COLOR_RESET} " + bar_content = bar_content[:width] if len(bar_content) > width else bar_content + + bg_code, fg_code = terminal.STATUS_BAR_COLORS_LOOKUP() + result = f"\033[{bg_code}m\033[{fg_code}m{bar_content}\033[0m" + if len(result) > width * 2: + simple_bar = f" ZDTT by ZaneDev | {time_str} " + simple_bar = simple_bar[:width] if len(simple_bar) > width else simple_bar.ljust(width) + result = f"\033[{bg_code}m\033[{fg_code}m{simple_bar}\033[0m" + return result + + +def render_status_bar(terminal): + try: + try: + term_size = shutil.get_terminal_size() + max_width = term_size.columns + except Exception: + max_width = 80 + bar_text = build_status_bar_text(terminal) + if len(bar_text) > max_width * 3: + bar_text = build_status_bar_text(terminal) + sys.stdout.write("\033[s") + sys.stdout.write("\033[1;1H") + sys.stdout.write("\033[2K") + sys.stdout.write("\033[0m") + sys.stdout.write(bar_text) + sys.stdout.write("\033[0m") + sys.stdout.write(f"\033[{max_width}G") + sys.stdout.write("\033[u") + sys.stdout.flush() + except Exception: + pass + + +def status_bar_loop(terminal): + while not terminal.status_bar_stop_event.is_set(): + render_status_bar(terminal) + if terminal.status_bar_stop_event.wait(2): + break + + +def start_status_bar_thread(terminal): + if terminal.status_bar_thread and terminal.status_bar_thread.is_alive(): + return + terminal.status_bar_stop_event.clear() + terminal.status_bar_thread = terminal._spawn_thread(target=lambda: status_bar_loop(terminal), name="ZDTTStatusBar") + terminal.status_bar_thread.start() + + +def initialize_status_bar(terminal): + set_scroll_region(terminal) + start_status_bar_thread(terminal) + render_status_bar(terminal) + + +def shutdown_status_bar(terminal): + terminal.status_bar_stop_event.set() + if terminal.status_bar_thread and terminal.status_bar_thread.is_alive(): + terminal.status_bar_thread.join(timeout=0.5) + terminal.status_bar_thread = None + reset_scroll_region(terminal) + + +def handle_resize(terminal, signum=None, frame=None): + if not terminal.resize_lock.acquire(blocking=False): + return + try: + import time as time_module + time_module.sleep(0.05) + reset_scroll_region(terminal) + set_scroll_region(terminal) + try: + sys.stdout.write("\033[1;1H") + sys.stdout.write("\033[2K") + sys.stdout.write("\033[0m") + sys.stdout.flush() + except Exception: + pass + render_status_bar(terminal) + try: + term_size = shutil.get_terminal_size() + sys.stdout.write(f"\033[{term_size.lines};1H") + sys.stdout.flush() + except Exception: + pass + except Exception: + pass + finally: + terminal.resize_lock.release() + + + diff --git a/zdtt/ui.py b/zdtt/ui.py new file mode 100644 index 0000000..5eb1bb5 --- /dev/null +++ b/zdtt/ui.py @@ -0,0 +1,85 @@ +""" +UI helpers for ZDTT: banner, compatibility warning, and prompt. +""" +import os +import shutil + + +def display_banner(terminal): + print() + try: + term_size = shutil.get_terminal_size() + min_height = 13 if not terminal.is_supported else 11 + min_width = 44 + if term_size.columns < min_width or term_size.lines < min_height: + print(f"ZDTT Terminal v{terminal.version}") + if not terminal.is_supported: + _show_compatibility_warning(terminal) + print() + return + except Exception: + pass + + if os.path.exists(terminal.banner_file): + try: + with open(terminal.banner_file, 'r') as f: + custom_banner = f.read() + if '{version}' in custom_banner: + custom_banner = custom_banner.replace('{version}', terminal.version) + print(custom_banner) + if not terminal.is_supported: + _show_compatibility_warning(terminal) + return + except Exception as e: + import logging + logging.error(f"Failed to load custom banner: {e}") + + banner = f""" +░█████████ ░███████ ░██████████░██████████ + ░██ ░██ ░██ ░██ ░██ + ░██ ░██ ░██ ░██ ░██ + ░███ ░██ ░██ ░██ ░██ + ░██ ░██ ░██ ░██ ░██ + ░██ ░██ ░██ ░██ ░██ +░█████████ ░███████ ░██ ░██ + + +ZDTT Terminal v{terminal.version} +""" + print(banner) + if not terminal.is_supported: + _show_compatibility_warning(terminal) + + +def _show_compatibility_warning(terminal): + if terminal.is_supported: + return + print() + print("⚠️ Running on unsupported system - limited support") + print(" Tested on Debian-based and Arch Linux distributions.") + print() + + +def get_prompt(terminal): + cwd = os.getcwd() + home = os.path.expanduser("~") + if cwd.startswith(home): + display_path = "~" + cwd[len(home):] + else: + display_path = cwd + + RL_PROMPT_START = '\001' + RL_PROMPT_END = '\002' + prompt = (f"{RL_PROMPT_START}{terminal.COLOR_BRIGHT_CYAN}{RL_PROMPT_END}┌─{RL_PROMPT_START}{terminal.COLOR_RESET}{RL_PROMPT_END}" + f"[{RL_PROMPT_START}{terminal.COLOR_BRIGHT_GREEN}{RL_PROMPT_END}{terminal.username}" + f"{RL_PROMPT_START}{terminal.COLOR_RESET}{RL_PROMPT_END}" + f"{RL_PROMPT_START}{terminal.COLOR_BRIGHT_WHITE}{RL_PROMPT_END}@{RL_PROMPT_START}{terminal.COLOR_RESET}{RL_PROMPT_END}" + f"{RL_PROMPT_START}{terminal.COLOR_BRIGHT_CYAN}{RL_PROMPT_END}ZDTT{RL_PROMPT_START}{terminal.COLOR_RESET}{RL_PROMPT_END} " + f"{RL_PROMPT_START}{terminal.COLOR_BRIGHT_BLUE}{RL_PROMPT_END}{display_path}" + f"{RL_PROMPT_START}{terminal.COLOR_RESET}{RL_PROMPT_END}]" + f"{RL_PROMPT_START}{terminal.COLOR_BRIGHT_CYAN}{RL_PROMPT_END}─{RL_PROMPT_START}{terminal.COLOR_RESET}{RL_PROMPT_END}\n" + f"{RL_PROMPT_START}{terminal.COLOR_BRIGHT_CYAN}{RL_PROMPT_END}└─{RL_PROMPT_START}{terminal.COLOR_BRIGHT_MAGENTA}{RL_PROMPT_END}➜{RL_PROMPT_START}{terminal.COLOR_RESET}{RL_PROMPT_END} ") + return prompt + + +