diff --git a/index.html b/index.html index 5139e23..0c9af29 100644 --- a/index.html +++ b/index.html @@ -46,7 +46,7 @@

Current release

-

v0.1.2.a.9

+

v0.1.2.b

Supported families

diff --git a/terminal.py b/terminal.py index 158557e..cc867cf 100644 --- a/terminal.py +++ b/terminal.py @@ -23,39 +23,39 @@ import urllib.request import urllib.error import time as time_module -# Ensure local 'zdtt' package is importable both from repo and installed locations -try: - # Probe a quick import to determine availability - import zdtt # type: ignore -except Exception: - script_dir = os.path.dirname(os.path.abspath(__file__)) - candidates = [ - script_dir, - os.path.abspath(os.path.join(script_dir, '..')), - os.path.join(script_dir, 'zdtt'), - '/home/zane/ZDTT', - ] - for path_candidate in candidates: - if path_candidate and os.path.isdir(path_candidate) and path_candidate not in sys.path: - sys.path.insert(0, path_candidate) - # Best-effort: ignore failure here; regular imports will raise if still missing - try: - import zdtt # type: ignore - except Exception: - pass -from zdtt.plugins import ( - PROTECTED_COMMANDS as PLUGIN_PROTECTED_COMMANDS, - validate_plugin_ast as plugins_validate_ast, - validate_plugin_commands as plugins_validate_commands, - move_to_quarantine as plugins_move_to_quarantine, -) -from zdtt.config import ( - check_system_compatibility, -) -from zdtt import status_bar as sb -from zdtt.shell import execute_system_command as shell_execute -from zdtt.ui import display_banner as ui_display_banner, get_prompt as ui_get_prompt +SUPPORTED_DEBIAN_IDS = { + 'debian', + 'ubuntu', + 'linuxmint', + 'mint', + 'pop', + 'pop-os', + 'pop_os', + 'elementary', + 'zorin', + 'kali', + 'parrot', + 'mx', + 'mx-linux', + 'deepin', + 'peppermint', + 'raspbian', + 'neon', +} + +SUPPORTED_ARCH_IDS = { + 'arch', + 'archlinux', + 'manjaro', + 'endeavouros', + 'endeavour', + 'arcolinux', + 'garuda', + 'artix', + 'blackarch', + 'chakra', +} STATUS_BAR_COLORS = { 'blue': ('44', '97'), @@ -68,15 +68,182 @@ STATUS_BAR_COLORS = { 'black': ('40', '97'), } -# Protected command names that plugins cannot override (sourced from plugins module) -PROTECTED_COMMANDS = PLUGIN_PROTECTED_COMMANDS - -# Helper for status bar module to get current color codes -def _STATUS_BAR_COLORS_LOOKUP(self): - return STATUS_BAR_COLORS.get(self.status_bar_color, ('44', '97')) +# Protected command names that plugins cannot override +PROTECTED_COMMANDS = { + 'ssh', 'sudo', 'su', 'cp', 'mv', 'rm', 'ls', 'cat', 'chmod', 'chown', + 'history', 'zps', 'zdtt', 'pip', 'python', 'python3', 'curl', 'wget' +} -## moved to zdtt.config +def _parse_os_release(): + """Return a dict of fields from /etc/os-release if available""" + data = {} + try: + with open('/etc/os-release', 'r') as f: + for line in f: + line = line.strip() + if not line or line.startswith('#') or '=' not in line: + continue + key, value = line.split('=', 1) + value = value.strip().strip('"') + data[key] = value + except FileNotFoundError: + pass + return data + + +def _collect_tokens(*values): + """Normalize distro identifiers for comparison""" + tokens = set() + for value in values: + if not value: + continue + normalized = value.replace('"', '').strip().lower() + if not normalized: + continue + # Keep the raw normalized value plus its dashed/underscored variants split + tokens.add(normalized) + delimiters_replaced = normalized.replace('-', ' ').replace('_', ' ') + for part in delimiters_replaced.split(): + if part: + tokens.add(part) + return tokens + + +def _detect_supported_distro(): + """Return distro identifier: 'debian', 'arch', or 'other'""" + if os.path.exists('/etc/debian_version'): + return 'debian' + + arch_markers = ( + '/etc/arch-release', + '/etc/artix-release', + ) + if any(os.path.exists(path) for path in arch_markers): + return 'arch' + + os_release = _parse_os_release() + tokens = _collect_tokens(os_release.get('ID'), os_release.get('ID_LIKE')) + + if tokens & SUPPORTED_DEBIAN_IDS: + return 'debian' + if tokens & SUPPORTED_ARCH_IDS: + return 'arch' + + # Fallback to package manager detection + if shutil.which('apt-get'): + return 'debian' + if shutil.which('pacman'): + return 'arch' + + return 'other' + + +def _prompt_distro_override(detected_distro): + """Allow user to override detected distro.""" + label_map = { + 'debian': "Debian-based", + 'arch': "Arch-based", + 'other': "Unsupported/Other", + } + print("=" * 60) + print(f"Detected distribution: {label_map.get(detected_distro, 'Unknown')}") + print("If this is incorrect, enter one of: debian / arch / other.") + print("Press Enter to accept the detected value.") + override = input("Override distribution (leave blank to keep): ").strip().lower() + + if override in ('debian', 'arch', 'other'): + return override + + if override: + print(f"Unknown override '{override}'. Using detected value.") + + return detected_distro + + +def _load_saved_distro(): + """Load saved distro preference from config file.""" + config_file = os.path.expanduser("~/.zdtt/config.json") + try: + with open(config_file, 'r') as f: + data = json.load(f) + saved_distro = data.get('distro') + if saved_distro in ('debian', 'arch', 'other'): + return saved_distro + except (FileNotFoundError, json.JSONDecodeError, KeyError): + pass + return None + +def _save_distro_preference(distro): + """Save distro preference to config file.""" + config_file = os.path.expanduser("~/.zdtt/config.json") + os.makedirs(os.path.dirname(config_file), exist_ok=True) + + data = {} + try: + with open(config_file, 'r') as f: + data = json.load(f) + except (FileNotFoundError, json.JSONDecodeError): + data = {} + + data['distro'] = distro + + with open(config_file, 'w') as f: + json.dump(data, f, indent=2) + +def check_system_compatibility(): + """Detect supported distributions and warn when unsupported""" + # Check for saved distro preference first + saved_distro = _load_saved_distro() + if saved_distro: + # Use saved preference, skip prompts + return saved_distro + + # Check if running on Linux + if sys.platform != 'linux': + print("=" * 60) + print("⚠️ WARNING: ZDTT Terminal is designed for Linux systems") + print(f" Detected platform: {sys.platform}") + print("=" * 60) + print("ZDTT may not work correctly on your system.") + print("Some features may be unavailable or broken.") + print() + response = input("Continue anyway? (yes/no): ").strip().lower() + if response != 'yes': + print("Installation cancelled.") + sys.exit(0) + distro = 'other' + _save_distro_preference(distro) + return distro + + # Detect supported distributions + distro = _detect_supported_distro() + + if distro not in ('debian', 'arch'): + # Unsupported distribution + print("=" * 60) + print("⚠️ WARNING: Unsupported Distribution Detected") + print("=" * 60) + print("ZDTT Terminal is optimized for Debian-based and Arch Linux systems.") + print() + print("Running on your current system may result in:") + print(" • Some commands may not work as expected") + print(" • Auto-install features may fail") + print(" • Reduced plugin compatibility") + print(" • Package management commands unavailable") + print() + response = input("Continue installation? (yes/no): ").strip().lower() + if response != 'yes': + print("Installation cancelled.") + sys.exit(0) + + # Offer override regardless of detection + distro = _prompt_distro_override(distro) + + # Save the selected distro preference + _save_distro_preference(distro) + + return distro class ZDTTTerminal: @@ -106,8 +273,6 @@ class ZDTTTerminal: self.safe_mode = False # Safe mode flag (no plugins loaded) self.quarantine_warnings = [] # Store warnings for plugins quarantined at startup self.trusted_plugins = set() # Plugins allowed to use imports - # Expose STATUS_BAR_COLORS lookup to status bar module - self.STATUS_BAR_COLORS_LOOKUP = lambda: _STATUS_BAR_COLORS_LOOKUP(self) # Setup logging for plugins self.setup_logging() @@ -310,49 +475,272 @@ class ZDTTTerminal: def display_banner(self): """Display the ZDTT ASCII art banner (or custom banner if available)""" - ui_display_banner(self) + print() + + # Check terminal size to see if banner will fit + try: + term_size = shutil.get_terminal_size() + # Banner is 44 chars wide and 11 lines tall (including version) + # Add extra space for compatibility warning if needed + min_height = 13 if not self.is_supported else 11 + min_width = 44 + + if term_size.columns < min_width or term_size.lines < min_height: + # Terminal too small, skip banner and just show minimal header + print(f"ZDTT Terminal v{self.version}") + if not self.is_supported: + print("⚠️ Unsupported system - limited support") + print() + return + except Exception: + # If we can't get terminal size, display the banner anyway + pass + + # Check for custom banner + if os.path.exists(self.banner_file): + try: + with open(self.banner_file, 'r') as f: + custom_banner = f.read() + # Add version at the bottom if not already present + if '{version}' in custom_banner: + custom_banner = custom_banner.replace('{version}', self.version) + print(custom_banner) + # Show warning for unsupported systems + if not self.is_supported: + self._show_compatibility_warning() + return + except Exception as e: + logging.error(f"Failed to load custom banner: {e}") + # Fall through to default banner + + # Default banner + banner = f""" +░█████████ ░███████ ░██████████░██████████ + ░██ ░██ ░██ ░██ ░██ + ░██ ░██ ░██ ░██ ░██ + ░███ ░██ ░██ ░██ ░██ + ░██ ░██ ░██ ░██ ░██ + ░██ ░██ ░██ ░██ ░██ +░█████████ ░███████ ░██ ░██ + + +ZDTT Terminal v{self.version} +""" + print(banner) + + # Show warning for unsupported systems after banner + if not self.is_supported: + self._show_compatibility_warning() def _show_compatibility_warning(self): """Show compatibility warning for unsupported systems""" - from zdtt.ui import _show_compatibility_warning as ui_warn - ui_warn(self) + if self.is_supported: + return + + print() + print("⚠️ Running on unsupported system - limited support") + print(" Tested on Debian-based and Arch Linux distributions.") + print() def initialize_status_bar(self): """Reserve the first terminal row and start the status bar thread.""" - sb.initialize_status_bar(self) + self._set_scroll_region() + self._start_status_bar_thread() + self._render_status_bar() def shutdown_status_bar(self): """Stop the status bar thread and release terminal state.""" - sb.shutdown_status_bar(self) + self.status_bar_stop_event.set() + if self.status_bar_thread and self.status_bar_thread.is_alive(): + self.status_bar_thread.join(timeout=0.5) + self.status_bar_thread = None + self._reset_scroll_region() def _start_status_bar_thread(self): - sb.start_status_bar_thread(self) + if self.status_bar_thread and self.status_bar_thread.is_alive(): + return + self.status_bar_stop_event.clear() + self.status_bar_thread = threading.Thread( + target=self._status_bar_loop, + name="ZDTTStatusBar", + daemon=True, + ) + self.status_bar_thread.start() def _status_bar_loop(self): - sb.status_bar_loop(self) + while not self.status_bar_stop_event.is_set(): + self._render_status_bar() + if self.status_bar_stop_event.wait(2): + break def _render_status_bar(self): """Render a single-line status bar with branding and time.""" - sb.render_status_bar(self) + try: + # Get terminal size first to ensure we don't write beyond bounds + try: + term_size = shutil.get_terminal_size() + max_width = term_size.columns + except Exception: + max_width = 80 # Fallback + + bar_text = self._build_status_bar_text() + + # Ensure bar_text doesn't exceed terminal width (safety check) + # Count visible characters (approximate - ANSI codes don't count) + # This is a rough check, but better than nothing + if len(bar_text) > max_width * 3: # Allow for ANSI codes (rough estimate) + # Rebuild with safer width + bar_text = self._build_status_bar_text() + + sys.stdout.write("\033[s") # Save cursor position + sys.stdout.write("\033[1;1H") # Move to first row, first column + sys.stdout.write("\033[2K") # Clear the entire line + sys.stdout.write("\033[0m") # Reset all attributes + sys.stdout.write(bar_text) + sys.stdout.write("\033[0m") # Ensure reset at end + # Move cursor to end of line to prevent wrapping issues + sys.stdout.write(f"\033[{max_width}G") # Move to column max_width + sys.stdout.write("\033[u") # Restore cursor + sys.stdout.flush() + except Exception: + # Fallback: just skip rendering if there's an error + pass def _build_status_bar_text(self): """Render a single-line status bar with enhanced branding and time.""" - return sb.build_status_bar_text(self) + left_text = f"{self.COLOR_BOLD}ZDTT{self.COLOR_RESET} by {self.COLOR_BOLD}ZaneDev{self.COLOR_RESET}" + time_str = datetime.now().strftime("%I:%M %p") + plain_left = "ZDTT by ZaneDev" + plain_time = time_str + + try: + # Always get fresh terminal size to handle resizes + term_size = shutil.get_terminal_size() + width = term_size.columns + # Safety: ensure width is at least 1 + width = max(1, width) + except Exception: + # Fallback to minimum width if we can't get terminal size + width = max(1, len(plain_left) + len(plain_time) + 6) + + # Calculate the minimum content width (plain text only, no ANSI codes) + # Format: " ZDTT by ZaneDev | TIME " + min_content_width = len(plain_left) + len(plain_time) + 5 # 5 = spaces + separator + + # Calculate padding to fill the line + if width < min_content_width: + # Terminal too narrow, use minimum padding + padding = 0 + else: + padding = width - min_content_width + + # Build the content (plain text calculation) + separator = f"{self.COLOR_DIM}│{self.COLOR_RESET}" + bar_content = f" {left_text} {' ' * padding}{separator} {self.COLOR_BRIGHT_WHITE}{time_str}{self.COLOR_RESET} " + + # Calculate actual display length (plain text only) + actual_display_len = len(plain_left) + len(plain_time) + padding + 5 + + # Ensure we fill exactly to terminal width (but never exceed it) + if actual_display_len < width: + # Add trailing spaces to fill exactly to width + trailing_spaces = width - actual_display_len + bar_content = bar_content.rstrip() + ' ' * trailing_spaces + elif actual_display_len > width: + # We exceeded width, recalculate with less padding + padding = max(0, width - min_content_width) + bar_content = f" {left_text} {' ' * padding}{separator} {self.COLOR_BRIGHT_WHITE}{time_str}{self.COLOR_RESET} " + actual_display_len = len(plain_left) + len(plain_time) + padding + 5 + if actual_display_len < width: + trailing_spaces = width - actual_display_len + bar_content = bar_content.rstrip() + ' ' * trailing_spaces + else: + # Still too wide, trim the time if necessary + if width < len(plain_left) + 10: + # Very narrow terminal, just show minimal content + bar_content = f" {left_text} {separator} {self.COLOR_BRIGHT_WHITE}{time_str[:8]}{self.COLOR_RESET} " + bar_content = bar_content[:width] if len(bar_content) > width else bar_content + + # Final safety check: ensure we never exceed terminal width + # This is approximate since ANSI codes don't count, but better than nothing + bg_code, fg_code = STATUS_BAR_COLORS.get(self.status_bar_color, ('44', '97')) + result = f"\033[{bg_code}m\033[{fg_code}m{bar_content}\033[0m" + + # If the result is suspiciously long, truncate it + # (rough heuristic: ANSI codes add ~30-50 chars, so if result > width*2, it's probably wrong) + if len(result) > width * 2: + # Emergency fallback: simple status bar + simple_bar = f" ZDTT by ZaneDev | {time_str} " + simple_bar = simple_bar[:width] if len(simple_bar) > width else simple_bar.ljust(width) + result = f"\033[{bg_code}m\033[{fg_code}m{simple_bar}\033[0m" + + return result def _set_scroll_region(self): """Reserve the top row for the status bar.""" - sb.set_scroll_region(self) + try: + rows = shutil.get_terminal_size().lines + rows = max(rows, 2) + sys.stdout.write(f"\033[2;{rows}r") + sys.stdout.write("\033[1;1H") + sys.stdout.write("\033[2K") + sys.stdout.write("\033[2;1H") + sys.stdout.flush() + self.scroll_region_set = True + except Exception: + self.scroll_region_set = False def _reset_scroll_region(self): """Restore default scrolling behavior.""" - sb.reset_scroll_region(self) + if not self.scroll_region_set: + return + sys.stdout.write("\033[r") + sys.stdout.flush() + self.scroll_region_set = False def _handle_resize(self, signum=None, frame=None): """Handle terminal resize event (SIGWINCH).""" - sb.handle_resize(self, signum, frame) - - def _spawn_thread(self, target, name): - return threading.Thread(target=target, name=name, daemon=True) + # Use a lock to prevent race conditions + if not self.resize_lock.acquire(blocking=False): + # If we can't acquire the lock immediately, skip this resize + # (another resize is already being handled) + return + + try: + # Small delay to let terminal settle after resize + time_module.sleep(0.05) + + # Reset scroll region first to clear any corrupted state + self._reset_scroll_region() + + # Update scroll region with new terminal size + self._set_scroll_region() + + # Clear the status bar line completely before redrawing + try: + sys.stdout.write("\033[1;1H") # Move to first row + sys.stdout.write("\033[2K") # Clear the entire line + sys.stdout.write("\033[0m") # Reset attributes + sys.stdout.flush() + except Exception: + pass + + # Force immediate status bar refresh + self._render_status_bar() + + # Ensure cursor is in a safe position + try: + term_size = shutil.get_terminal_size() + sys.stdout.write(f"\033[{term_size.lines};1H") # Move to last line, first column + sys.stdout.flush() + except Exception: + pass + + except Exception: + # Silently fail if resize handling fails + pass + finally: + self.resize_lock.release() def setup_readline(self): """Setup readline for history and tab completion""" @@ -412,19 +800,92 @@ class ZDTTTerminal: return None def _validate_plugin_ast(self, plugin_code, plugin_name): - # Delegate to plugins module - return plugins_validate_ast(plugin_code, plugin_name) + """ + Validate plugin AST to ensure no top-level code execution. + Only allows: imports, function definitions, class definitions, and docstrings. + """ + try: + tree = ast.parse(plugin_code) + except SyntaxError as e: + raise ValueError(f"Plugin has syntax errors: {e}") + + # Check module body directly - this is the most reliable way + if not isinstance(tree, ast.Module): + raise ValueError("Plugin must be a valid Python module") + + for stmt in tree.body: + # Allow imports + if isinstance(stmt, (ast.Import, ast.ImportFrom)): + continue + # Allow function definitions + if isinstance(stmt, (ast.FunctionDef, ast.AsyncFunctionDef)): + continue + # Allow class definitions + if isinstance(stmt, ast.ClassDef): + continue + # Allow docstrings (Expr with string constant) + if isinstance(stmt, ast.Expr): + # Check if it's a string literal (docstring) + if isinstance(stmt.value, (ast.Constant, ast.Str)): + # For Python 3.8+, use ast.Constant; for older versions, use ast.Str + if isinstance(stmt.value, ast.Constant): + if isinstance(stmt.value.value, str): + continue + elif isinstance(stmt.value, ast.Str): + continue + # Anything else is forbidden (assignments, function calls, loops, etc.) + raise ValueError( + f"Plugin contains forbidden top-level statement: {stmt.__class__.__name__}. " + "Plugins can only contain imports, functions, classes, and docstrings. " + "No top-level code execution is allowed." + ) + + return True def _move_to_quarantine(self, plugin_file, reason): - # Delegate to plugins module - path = plugins_move_to_quarantine(plugin_file, self.quarantine_dir, logging) - if path: - logging.warning(f"Reason: {reason}") - return path + """Move a plugin file to quarantine directory and log the reason.""" + plugin_name = os.path.basename(plugin_file) + os.makedirs(self.quarantine_dir, exist_ok=True) + + # Create unique filename if already exists + quarantine_path = os.path.join(self.quarantine_dir, plugin_name) + counter = 1 + while os.path.exists(quarantine_path): + name, ext = os.path.splitext(plugin_name) + quarantine_path = os.path.join(self.quarantine_dir, f"{name}_{counter}{ext}") + counter += 1 + + try: + shutil.move(plugin_file, quarantine_path) + logging.warning(f"Plugin '{plugin_name}' quarantined: {reason}") + logging.warning(f"Moved to: {quarantine_path}") + return quarantine_path + except Exception as e: + logging.error(f"Failed to quarantine plugin '{plugin_name}': {e}") + return None def _validate_plugin_commands(self, plugin_commands, plugin_name): - # Delegate to plugins module - return plugins_validate_commands(plugin_commands, plugin_name, PROTECTED_COMMANDS) + """Validate that plugin commands don't override protected commands.""" + violations = [] + for cmd_name in plugin_commands.keys(): + if cmd_name in PROTECTED_COMMANDS: + violations.append(cmd_name) + + if violations: + raise ValueError( + f"Plugin attempted to override protected commands: {', '.join(violations)}. " + "This is a security violation and the plugin has been quarantined." + ) + + # Validate that all values are callable + for cmd_name, cmd_func in plugin_commands.items(): + if not callable(cmd_func): + raise ValueError( + f"Plugin command '{cmd_name}' is not callable. " + "All commands must be functions." + ) + + return True def load_plugins(self): """Load plugin commands from the plugins directory with security validation.""" @@ -667,7 +1128,32 @@ class ZDTTTerminal: def get_prompt(self): """Return the custom prompt string with enhanced colors""" - return ui_get_prompt(self) + # Show current directory in prompt + cwd = os.getcwd() + # Show ~ for home directory + home = os.path.expanduser("~") + if cwd.startswith(home): + display_path = "~" + cwd[len(home):] + else: + display_path = cwd + + # Wrap ANSI codes in \001 and \002 so readline knows they're non-printable + # This fixes line wrapping issues with long commands + RL_PROMPT_START = '\001' + RL_PROMPT_END = '\002' + + # Create enhanced colorized prompt with gradient-like effect + # [username in bright green @ ZDTT in bright cyan path in bright blue]=> + prompt = (f"{RL_PROMPT_START}{self.COLOR_BRIGHT_CYAN}{RL_PROMPT_END}┌─{RL_PROMPT_START}{self.COLOR_RESET}{RL_PROMPT_END}" + f"[{RL_PROMPT_START}{self.COLOR_BRIGHT_GREEN}{RL_PROMPT_END}{self.username}" + f"{RL_PROMPT_START}{self.COLOR_RESET}{RL_PROMPT_END}" + f"{RL_PROMPT_START}{self.COLOR_BRIGHT_WHITE}{RL_PROMPT_END}@{RL_PROMPT_START}{self.COLOR_RESET}{RL_PROMPT_END}" + f"{RL_PROMPT_START}{self.COLOR_BRIGHT_CYAN}{RL_PROMPT_END}ZDTT{RL_PROMPT_START}{self.COLOR_RESET}{RL_PROMPT_END} " + f"{RL_PROMPT_START}{self.COLOR_BRIGHT_BLUE}{RL_PROMPT_END}{display_path}" + f"{RL_PROMPT_START}{self.COLOR_RESET}{RL_PROMPT_END}]" + f"{RL_PROMPT_START}{self.COLOR_BRIGHT_CYAN}{RL_PROMPT_END}─{RL_PROMPT_START}{self.COLOR_RESET}{RL_PROMPT_END}\n" + f"{RL_PROMPT_START}{self.COLOR_BRIGHT_CYAN}{RL_PROMPT_END}└─{RL_PROMPT_START}{self.COLOR_BRIGHT_MAGENTA}{RL_PROMPT_END}➜{RL_PROMPT_START}{self.COLOR_RESET}{RL_PROMPT_END} ") + return prompt def cmd_help(self, args): """Display available commands with enhanced formatting""" @@ -1463,7 +1949,93 @@ class ZDTTTerminal: def _execute_system_command(self, command): """Execute a system command with real-time I/O streaming.""" - shell_execute(self, command) + # Temporarily disable status bar updates during command execution + status_bar_was_running = self.status_bar_thread and self.status_bar_thread.is_alive() + + try: + # Start the process with direct stdin/stdout/stderr + process = subprocess.Popen( + command, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, # Merge stderr into stdout + stdin=sys.stdin, # Direct stdin passthrough + bufsize=1, # Line buffered + text=True, + cwd=self.current_dir + ) + + # Buffer for early output detection + early_output = [] + start_time = time_module.time() + check_timeout = 0.1 # 0.1 seconds + hide_output = False + output_buffer = [] + + # Read output in real-time + try: + while True: + # Read character by character for early detection + char = process.stdout.read(1) + if not char: + if process.poll() is not None: + break + time_module.sleep(0.01) + continue + + # Check for "command not found" in first 0.1 seconds + if time_module.time() - start_time < check_timeout: + early_output.append(char) + combined = ''.join(early_output).lower() + if 'command not found' in combined or 'not found:' in combined: + hide_output = True + # Consume remaining output silently + while process.poll() is None: + process.stdout.read(1) + break + + # Buffer output + output_buffer.append(char) + + # If we have a complete line or enough chars, flush + if char == '\n' or len(output_buffer) >= 1024: + if not hide_output: + sys.stdout.write(''.join(output_buffer)) + sys.stdout.flush() + output_buffer.clear() + + # Flush remaining buffer + if output_buffer and not hide_output: + sys.stdout.write(''.join(output_buffer)) + sys.stdout.flush() + + # Wait for process to finish + process.wait() + + except BrokenPipeError: + # Process closed stdout + pass + + except KeyboardInterrupt: + # Handle Ctrl+C + try: + if 'process' in locals(): + process.terminate() + process.wait(timeout=1) + except Exception: + try: + if 'process' in locals(): + process.kill() + except Exception: + pass + print("\n^C") + except Exception as e: + if not hide_output: + print(f"{self.COLOR_ERROR}Error executing command: {e}{self.COLOR_RESET}") + finally: + # Restore status bar if it was running + if status_bar_was_running: + self._render_status_bar() def execute_command(self, command_line): """Parse and execute a command""" diff --git a/version.txt b/version.txt index 16216ac..c44b9ff 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -0.1.2.a.9 \ No newline at end of file +0.1.2.b \ No newline at end of file diff --git a/zdtt/__init__.py b/zdtt/__init__.py deleted file mode 100644 index dc54eac..0000000 --- a/zdtt/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -# ZDTT package - - - diff --git a/zdtt/__pycache__/__init__.cpython-312.pyc b/zdtt/__pycache__/__init__.cpython-312.pyc deleted file mode 100644 index a5d8625..0000000 Binary files a/zdtt/__pycache__/__init__.cpython-312.pyc and /dev/null differ diff --git a/zdtt/__pycache__/config.cpython-312.pyc b/zdtt/__pycache__/config.cpython-312.pyc deleted file mode 100644 index d8701e9..0000000 Binary files a/zdtt/__pycache__/config.cpython-312.pyc and /dev/null differ diff --git a/zdtt/__pycache__/plugins.cpython-312.pyc b/zdtt/__pycache__/plugins.cpython-312.pyc deleted file mode 100644 index c60fabc..0000000 Binary files a/zdtt/__pycache__/plugins.cpython-312.pyc and /dev/null differ diff --git a/zdtt/__pycache__/shell.cpython-312.pyc b/zdtt/__pycache__/shell.cpython-312.pyc deleted file mode 100644 index a739182..0000000 Binary files a/zdtt/__pycache__/shell.cpython-312.pyc and /dev/null differ diff --git a/zdtt/__pycache__/status_bar.cpython-312.pyc b/zdtt/__pycache__/status_bar.cpython-312.pyc deleted file mode 100644 index c4fe83e..0000000 Binary files a/zdtt/__pycache__/status_bar.cpython-312.pyc and /dev/null differ diff --git a/zdtt/__pycache__/ui.cpython-312.pyc b/zdtt/__pycache__/ui.cpython-312.pyc deleted file mode 100644 index 5242472..0000000 Binary files a/zdtt/__pycache__/ui.cpython-312.pyc and /dev/null differ diff --git a/zdtt/config.py b/zdtt/config.py deleted file mode 100644 index b1a1575..0000000 --- a/zdtt/config.py +++ /dev/null @@ -1,166 +0,0 @@ -""" -System detection and persistent configuration helpers for ZDTT. -""" -import os -import sys -import shutil -import json - -SUPPORTED_DEBIAN_IDS = { - 'debian', 'ubuntu', 'linuxmint', 'mint', 'pop', 'pop-os', 'pop_os', - 'elementary', 'zorin', 'kali', 'parrot', 'mx', 'mx-linux', 'deepin', - 'peppermint', 'raspbian', 'neon', -} - -SUPPORTED_ARCH_IDS = { - 'arch', 'archlinux', 'manjaro', 'endeavouros', 'endeavour', 'arcolinux', - 'garuda', 'artix', 'blackarch', 'chakra', -} - - -def _parse_os_release(): - data = {} - try: - with open('/etc/os-release', 'r') as f: - for line in f: - line = line.strip() - if not line or line.startswith('#') or '=' not in line: - continue - key, value = line.split('=', 1) - value = value.strip().strip('"') - data[key] = value - except FileNotFoundError: - pass - return data - - -def _collect_tokens(*values): - tokens = set() - for value in values: - if not value: - continue - normalized = value.replace('"', '').strip().lower() - if not normalized: - continue - tokens.add(normalized) - delimiters_replaced = normalized.replace('-', ' ').replace('_', ' ') - for part in delimiters_replaced.split(): - if part: - tokens.add(part) - return tokens - - -def _detect_supported_distro(): - if os.path.exists('/etc/debian_version'): - return 'debian' - - arch_markers = ('/etc/arch-release', '/etc/artix-release') - if any(os.path.exists(path) for path in arch_markers): - return 'arch' - - os_release = _parse_os_release() - tokens = _collect_tokens(os_release.get('ID'), os_release.get('ID_LIKE')) - - if tokens & SUPPORTED_DEBIAN_IDS: - return 'debian' - if tokens & SUPPORTED_ARCH_IDS: - return 'arch' - - if shutil.which('apt-get'): - return 'debian' - if shutil.which('pacman'): - return 'arch' - return 'other' - - -def _prompt_distro_override(detected_distro): - label_map = { - 'debian': "Debian-based", - 'arch': "Arch-based", - 'other': "Unsupported/Other", - } - print("=" * 60) - print(f"Detected distribution: {label_map.get(detected_distro, 'Unknown')}") - print("If this is incorrect, enter one of: debian / arch / other.") - print("Press Enter to accept the detected value.") - override = input("Override distribution (leave blank to keep): ").strip().lower() - if override in ('debian', 'arch', 'other'): - return override - if override: - print(f"Unknown override '{override}'. Using detected value.") - return detected_distro - - -def _load_saved_distro(): - config_file = os.path.expanduser("~/.zdtt/config.json") - try: - with open(config_file, 'r') as f: - data = json.load(f) - saved_distro = data.get('distro') - if saved_distro in ('debian', 'arch', 'other'): - return saved_distro - except (FileNotFoundError, json.JSONDecodeError, KeyError): - pass - return None - - -def _save_distro_preference(distro: str): - config_file = os.path.expanduser("~/.zdtt/config.json") - os.makedirs(os.path.dirname(config_file), exist_ok=True) - data = {} - try: - with open(config_file, 'r') as f: - data = json.load(f) - except (FileNotFoundError, json.JSONDecodeError): - data = {} - data['distro'] = distro - with open(config_file, 'w') as f: - json.dump(data, f, indent=2) - - -def check_system_compatibility(): - """Detect supported distributions and warn when unsupported. Returns 'debian' | 'arch' | 'other'.""" - saved_distro = _load_saved_distro() - if saved_distro: - return saved_distro - - if sys.platform != 'linux': - print("=" * 60) - print("⚠️ WARNING: ZDTT Terminal is designed for Linux systems") - print(f" Detected platform: {sys.platform}") - print("=" * 60) - print("ZDTT may not work correctly on your system.") - print("Some features may be unavailable or broken.") - print() - response = input("Continue anyway? (yes/no): ").strip().lower() - if response != 'yes': - print("Installation cancelled.") - sys.exit(0) - distro = 'other' - _save_distro_preference(distro) - return distro - - distro = _detect_supported_distro() - if distro not in ('debian', 'arch'): - print("=" * 60) - print("⚠️ WARNING: Unsupported Distribution Detected") - print("=" * 60) - print("ZDTT Terminal is optimized for Debian-based and Arch Linux systems.") - print() - print("Running on your current system may result in:") - print(" • Some commands may not work as expected") - print(" • Auto-install features may fail") - print(" • Reduced plugin compatibility") - print(" • Package management commands unavailable") - print() - response = input("Continue installation? (yes/no): ").strip().lower() - if response != 'yes': - print("Installation cancelled.") - sys.exit(0) - - distro = _prompt_distro_override(distro) - _save_distro_preference(distro) - return distro - - - diff --git a/zdtt/plugins.py b/zdtt/plugins.py deleted file mode 100644 index 1fb9e3e..0000000 --- a/zdtt/plugins.py +++ /dev/null @@ -1,106 +0,0 @@ -""" -Plugin utilities for ZDTT: AST validation, quarantine, and command validation. -""" - -import os -import shutil -import ast -from typing import Dict, Callable, Iterable, Optional - -# Protected command names that plugins cannot override -PROTECTED_COMMANDS = { - 'ssh', 'sudo', 'su', 'cp', 'mv', 'rm', 'ls', 'cat', 'chmod', 'chown', - 'history', 'zps', 'zdtt', 'pip', 'python', 'python3', 'curl', 'wget' -} - - -def validate_plugin_ast(plugin_code: str, plugin_name: str) -> bool: - """ - Validate plugin AST to ensure no top-level code execution. - Only allows: imports, function definitions, class definitions, and docstrings. - Raises ValueError on violation. - """ - try: - tree = ast.parse(plugin_code) - except SyntaxError as e: - raise ValueError(f"Plugin has syntax errors: {e}") - - if not isinstance(tree, ast.Module): - raise ValueError("Plugin must be a valid Python module") - - for stmt in tree.body: - if isinstance(stmt, (ast.Import, ast.ImportFrom)): - continue - if isinstance(stmt, (ast.FunctionDef, ast.AsyncFunctionDef)): - continue - if isinstance(stmt, ast.ClassDef): - continue - if isinstance(stmt, ast.Expr): - # Allow docstring literals - if isinstance(stmt.value, (ast.Constant, ast.Str)): - if isinstance(stmt.value, ast.Constant): - if isinstance(stmt.value.value, str): - continue - else: - # ast.Str case (older Python) - continue - raise ValueError( - f"Plugin contains forbidden top-level statement: {stmt.__class__.__name__}. " - "Plugins can only contain imports, functions, classes, and docstrings. " - "No top-level code execution is allowed." - ) - - return True - - -def move_to_quarantine(plugin_file: str, quarantine_dir: str, logger) -> Optional[str]: - """ - Move a plugin file to quarantine directory and log the reason via caller. - Returns the final quarantine path or None on failure. - """ - plugin_name = os.path.basename(plugin_file) - os.makedirs(quarantine_dir, exist_ok=True) - - quarantine_path = os.path.join(quarantine_dir, plugin_name) - counter = 1 - while os.path.exists(quarantine_path): - name, ext = os.path.splitext(plugin_name) - quarantine_path = os.path.join(quarantine_dir, f"{name}_{counter}{ext}") - counter += 1 - - try: - shutil.move(plugin_file, quarantine_path) - logger.warning(f"Plugin '{plugin_name}' quarantined") - logger.warning(f"Moved to: {quarantine_path}") - return quarantine_path - except Exception as e: - logger.error(f"Failed to quarantine plugin '{plugin_name}': {e}") - return None - - -def validate_plugin_commands( - plugin_commands: Dict[str, Callable], - plugin_name: str, - protected_commands: Iterable[str] = PROTECTED_COMMANDS, -) -> bool: - """ - Ensure plugins do not override protected commands and values are callable. - Raises ValueError on violation. - """ - violations = [cmd for cmd in plugin_commands.keys() if cmd in protected_commands] - if violations: - raise ValueError( - f"Plugin attempted to override protected commands: {', '.join(violations)}. " - "This is a security violation and the plugin has been quarantined." - ) - - for cmd_name, cmd_func in plugin_commands.items(): - if not callable(cmd_func): - raise ValueError( - f"Plugin command '{cmd_name}' is not callable. All commands must be functions." - ) - - return True - - - diff --git a/zdtt/shell.py b/zdtt/shell.py deleted file mode 100644 index f354f06..0000000 --- a/zdtt/shell.py +++ /dev/null @@ -1,82 +0,0 @@ -""" -System shell command execution utilities for ZDTT. -""" -import sys -import subprocess -import time as time_module - - -def execute_system_command(terminal, command: str): - """Execute a system command with real-time I/O streaming.""" - status_bar_was_running = terminal.status_bar_thread and terminal.status_bar_thread.is_alive() - try: - process = subprocess.Popen( - command, - shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - stdin=sys.stdin, - bufsize=1, - text=True, - cwd=terminal.current_dir - ) - - early_output = [] - start_time = time_module.time() - check_timeout = 0.1 - hide_output = False - output_buffer = [] - - try: - while True: - char = process.stdout.read(1) - if not char: - if process.poll() is not None: - break - time_module.sleep(0.01) - continue - - if time_module.time() - start_time < check_timeout: - early_output.append(char) - combined = ''.join(early_output).lower() - if 'command not found' in combined or 'not found:' in combined: - hide_output = True - while process.poll() is None: - process.stdout.read(1) - break - - output_buffer.append(char) - if char == '\n' or len(output_buffer) >= 1024: - if not hide_output: - sys.stdout.write(''.join(output_buffer)) - sys.stdout.flush() - output_buffer.clear() - - if output_buffer and not hide_output: - sys.stdout.write(''.join(output_buffer)) - sys.stdout.flush() - - process.wait() - except BrokenPipeError: - pass - except KeyboardInterrupt: - try: - if 'process' in locals(): - process.terminate() - process.wait(timeout=1) - except Exception: - try: - if 'process' in locals(): - process.kill() - except Exception: - pass - print("\n^C") - except Exception as e: - if not locals().get('hide_output', False): - print(f"{terminal.COLOR_ERROR}Error executing command: {e}{terminal.COLOR_RESET}") - finally: - if status_bar_was_running: - terminal._render_status_bar() - - - diff --git a/zdtt/status_bar.py b/zdtt/status_bar.py deleted file mode 100644 index 11625b5..0000000 --- a/zdtt/status_bar.py +++ /dev/null @@ -1,154 +0,0 @@ -""" -Status bar, scroll region, and resize handling utilities for ZDTT. -All functions operate on the provided terminal instance. -""" -import sys -import shutil -from datetime import datetime - - -def set_scroll_region(terminal): - try: - rows = shutil.get_terminal_size().lines - rows = max(rows, 2) - sys.stdout.write(f"\033[2;{rows}r") - sys.stdout.write("\033[1;1H") - sys.stdout.write("\033[2K") - sys.stdout.write("\033[2;1H") - sys.stdout.flush() - terminal.scroll_region_set = True - except Exception: - terminal.scroll_region_set = False - - -def reset_scroll_region(terminal): - if not terminal.scroll_region_set: - return - sys.stdout.write("\033[r") - sys.stdout.flush() - terminal.scroll_region_set = False - - -def build_status_bar_text(terminal): - left_text = f"{terminal.COLOR_BOLD}ZDTT{terminal.COLOR_RESET} by {terminal.COLOR_BOLD}ZaneDev{terminal.COLOR_RESET}" - time_str = datetime.now().strftime("%I:%M %p") - plain_left = "ZDTT by ZaneDev" - plain_time = time_str - - try: - term_size = shutil.get_terminal_size() - width = max(1, term_size.columns) - except Exception: - width = max(1, len(plain_left) + len(plain_time) + 6) - - min_content_width = len(plain_left) + len(plain_time) + 5 - padding = 0 if width < min_content_width else width - min_content_width - separator = f"{terminal.COLOR_DIM}│{terminal.COLOR_RESET}" - bar_content = f" {left_text} {' ' * padding}{separator} {terminal.COLOR_BRIGHT_WHITE}{time_str}{terminal.COLOR_RESET} " - actual_display_len = len(plain_left) + len(plain_time) + padding + 5 - - if actual_display_len < width: - trailing_spaces = width - actual_display_len - bar_content = bar_content.rstrip() + ' ' * trailing_spaces - elif actual_display_len > width: - padding = max(0, width - min_content_width) - bar_content = f" {left_text} {' ' * padding}{separator} {terminal.COLOR_BRIGHT_WHITE}{time_str}{terminal.COLOR_RESET} " - actual_display_len = len(plain_left) + len(plain_time) + padding + 5 - if actual_display_len < width: - trailing_spaces = width - actual_display_len - bar_content = bar_content.rstrip() + ' ' * trailing_spaces - else: - if width < len(plain_left) + 10: - bar_content = f" {left_text} {separator} {terminal.COLOR_BRIGHT_WHITE}{time_str[:8]}{terminal.COLOR_RESET} " - bar_content = bar_content[:width] if len(bar_content) > width else bar_content - - bg_code, fg_code = terminal.STATUS_BAR_COLORS_LOOKUP() - result = f"\033[{bg_code}m\033[{fg_code}m{bar_content}\033[0m" - if len(result) > width * 2: - simple_bar = f" ZDTT by ZaneDev | {time_str} " - simple_bar = simple_bar[:width] if len(simple_bar) > width else simple_bar.ljust(width) - result = f"\033[{bg_code}m\033[{fg_code}m{simple_bar}\033[0m" - return result - - -def render_status_bar(terminal): - try: - try: - term_size = shutil.get_terminal_size() - max_width = term_size.columns - except Exception: - max_width = 80 - bar_text = build_status_bar_text(terminal) - if len(bar_text) > max_width * 3: - bar_text = build_status_bar_text(terminal) - sys.stdout.write("\033[s") - sys.stdout.write("\033[1;1H") - sys.stdout.write("\033[2K") - sys.stdout.write("\033[0m") - sys.stdout.write(bar_text) - sys.stdout.write("\033[0m") - sys.stdout.write(f"\033[{max_width}G") - sys.stdout.write("\033[u") - sys.stdout.flush() - except Exception: - pass - - -def status_bar_loop(terminal): - while not terminal.status_bar_stop_event.is_set(): - render_status_bar(terminal) - if terminal.status_bar_stop_event.wait(2): - break - - -def start_status_bar_thread(terminal): - if terminal.status_bar_thread and terminal.status_bar_thread.is_alive(): - return - terminal.status_bar_stop_event.clear() - terminal.status_bar_thread = terminal._spawn_thread(target=lambda: status_bar_loop(terminal), name="ZDTTStatusBar") - terminal.status_bar_thread.start() - - -def initialize_status_bar(terminal): - set_scroll_region(terminal) - start_status_bar_thread(terminal) - render_status_bar(terminal) - - -def shutdown_status_bar(terminal): - terminal.status_bar_stop_event.set() - if terminal.status_bar_thread and terminal.status_bar_thread.is_alive(): - terminal.status_bar_thread.join(timeout=0.5) - terminal.status_bar_thread = None - reset_scroll_region(terminal) - - -def handle_resize(terminal, signum=None, frame=None): - if not terminal.resize_lock.acquire(blocking=False): - return - try: - import time as time_module - time_module.sleep(0.05) - reset_scroll_region(terminal) - set_scroll_region(terminal) - try: - sys.stdout.write("\033[1;1H") - sys.stdout.write("\033[2K") - sys.stdout.write("\033[0m") - sys.stdout.flush() - except Exception: - pass - render_status_bar(terminal) - try: - term_size = shutil.get_terminal_size() - sys.stdout.write(f"\033[{term_size.lines};1H") - sys.stdout.flush() - except Exception: - pass - except Exception: - pass - finally: - terminal.resize_lock.release() - - - diff --git a/zdtt/ui.py b/zdtt/ui.py deleted file mode 100644 index 5eb1bb5..0000000 --- a/zdtt/ui.py +++ /dev/null @@ -1,85 +0,0 @@ -""" -UI helpers for ZDTT: banner, compatibility warning, and prompt. -""" -import os -import shutil - - -def display_banner(terminal): - print() - try: - term_size = shutil.get_terminal_size() - min_height = 13 if not terminal.is_supported else 11 - min_width = 44 - if term_size.columns < min_width or term_size.lines < min_height: - print(f"ZDTT Terminal v{terminal.version}") - if not terminal.is_supported: - _show_compatibility_warning(terminal) - print() - return - except Exception: - pass - - if os.path.exists(terminal.banner_file): - try: - with open(terminal.banner_file, 'r') as f: - custom_banner = f.read() - if '{version}' in custom_banner: - custom_banner = custom_banner.replace('{version}', terminal.version) - print(custom_banner) - if not terminal.is_supported: - _show_compatibility_warning(terminal) - return - except Exception as e: - import logging - logging.error(f"Failed to load custom banner: {e}") - - banner = f""" -░█████████ ░███████ ░██████████░██████████ - ░██ ░██ ░██ ░██ ░██ - ░██ ░██ ░██ ░██ ░██ - ░███ ░██ ░██ ░██ ░██ - ░██ ░██ ░██ ░██ ░██ - ░██ ░██ ░██ ░██ ░██ -░█████████ ░███████ ░██ ░██ - - -ZDTT Terminal v{terminal.version} -""" - print(banner) - if not terminal.is_supported: - _show_compatibility_warning(terminal) - - -def _show_compatibility_warning(terminal): - if terminal.is_supported: - return - print() - print("⚠️ Running on unsupported system - limited support") - print(" Tested on Debian-based and Arch Linux distributions.") - print() - - -def get_prompt(terminal): - cwd = os.getcwd() - home = os.path.expanduser("~") - if cwd.startswith(home): - display_path = "~" + cwd[len(home):] - else: - display_path = cwd - - RL_PROMPT_START = '\001' - RL_PROMPT_END = '\002' - prompt = (f"{RL_PROMPT_START}{terminal.COLOR_BRIGHT_CYAN}{RL_PROMPT_END}┌─{RL_PROMPT_START}{terminal.COLOR_RESET}{RL_PROMPT_END}" - f"[{RL_PROMPT_START}{terminal.COLOR_BRIGHT_GREEN}{RL_PROMPT_END}{terminal.username}" - f"{RL_PROMPT_START}{terminal.COLOR_RESET}{RL_PROMPT_END}" - f"{RL_PROMPT_START}{terminal.COLOR_BRIGHT_WHITE}{RL_PROMPT_END}@{RL_PROMPT_START}{terminal.COLOR_RESET}{RL_PROMPT_END}" - f"{RL_PROMPT_START}{terminal.COLOR_BRIGHT_CYAN}{RL_PROMPT_END}ZDTT{RL_PROMPT_START}{terminal.COLOR_RESET}{RL_PROMPT_END} " - f"{RL_PROMPT_START}{terminal.COLOR_BRIGHT_BLUE}{RL_PROMPT_END}{display_path}" - f"{RL_PROMPT_START}{terminal.COLOR_RESET}{RL_PROMPT_END}]" - f"{RL_PROMPT_START}{terminal.COLOR_BRIGHT_CYAN}{RL_PROMPT_END}─{RL_PROMPT_START}{terminal.COLOR_RESET}{RL_PROMPT_END}\n" - f"{RL_PROMPT_START}{terminal.COLOR_BRIGHT_CYAN}{RL_PROMPT_END}└─{RL_PROMPT_START}{terminal.COLOR_BRIGHT_MAGENTA}{RL_PROMPT_END}➜{RL_PROMPT_START}{terminal.COLOR_RESET}{RL_PROMPT_END} ") - return prompt - - -