Update release version to v0.1.2.b and refactor terminal.py for improved functionality

- Updated the release version in index.html and version.txt to v0.1.2.b.
- Refactored terminal.py to consolidate system detection and plugin management functionalities, enhancing code organization and maintainability.
- Removed deprecated modules and streamlined the command execution process for better performance.
This commit is contained in:
2025-11-16 11:11:18 -05:00
parent 7f06853544
commit c9b2be1724
15 changed files with 641 additions and 666 deletions

View File

@@ -46,7 +46,7 @@
<div class="hero__metrics">
<div class="metric">
<p class="metric__label">Current release</p>
<p class="metric__value">v0.1.2.a.9</p>
<p class="metric__value">v0.1.2.b</p>
</div>
<div class="metric">
<p class="metric__label">Supported families</p>

View File

@@ -23,39 +23,39 @@ import urllib.request
import urllib.error
import time as time_module
# Ensure local 'zdtt' package is importable both from repo and installed locations
try:
# Probe a quick import to determine availability
import zdtt # type: ignore
except Exception:
script_dir = os.path.dirname(os.path.abspath(__file__))
candidates = [
script_dir,
os.path.abspath(os.path.join(script_dir, '..')),
os.path.join(script_dir, 'zdtt'),
'/home/zane/ZDTT',
]
for path_candidate in candidates:
if path_candidate and os.path.isdir(path_candidate) and path_candidate not in sys.path:
sys.path.insert(0, path_candidate)
# Best-effort: ignore failure here; regular imports will raise if still missing
try:
import zdtt # type: ignore
except Exception:
pass
from zdtt.plugins import (
PROTECTED_COMMANDS as PLUGIN_PROTECTED_COMMANDS,
validate_plugin_ast as plugins_validate_ast,
validate_plugin_commands as plugins_validate_commands,
move_to_quarantine as plugins_move_to_quarantine,
)
from zdtt.config import (
check_system_compatibility,
)
from zdtt import status_bar as sb
from zdtt.shell import execute_system_command as shell_execute
from zdtt.ui import display_banner as ui_display_banner, get_prompt as ui_get_prompt
SUPPORTED_DEBIAN_IDS = {
'debian',
'ubuntu',
'linuxmint',
'mint',
'pop',
'pop-os',
'pop_os',
'elementary',
'zorin',
'kali',
'parrot',
'mx',
'mx-linux',
'deepin',
'peppermint',
'raspbian',
'neon',
}
SUPPORTED_ARCH_IDS = {
'arch',
'archlinux',
'manjaro',
'endeavouros',
'endeavour',
'arcolinux',
'garuda',
'artix',
'blackarch',
'chakra',
}
STATUS_BAR_COLORS = {
'blue': ('44', '97'),
@@ -68,15 +68,182 @@ STATUS_BAR_COLORS = {
'black': ('40', '97'),
}
# Protected command names that plugins cannot override (sourced from plugins module)
PROTECTED_COMMANDS = PLUGIN_PROTECTED_COMMANDS
# Helper for status bar module to get current color codes
def _STATUS_BAR_COLORS_LOOKUP(self):
return STATUS_BAR_COLORS.get(self.status_bar_color, ('44', '97'))
# Protected command names that plugins cannot override
PROTECTED_COMMANDS = {
'ssh', 'sudo', 'su', 'cp', 'mv', 'rm', 'ls', 'cat', 'chmod', 'chown',
'history', 'zps', 'zdtt', 'pip', 'python', 'python3', 'curl', 'wget'
}
## moved to zdtt.config
def _parse_os_release():
"""Return a dict of fields from /etc/os-release if available"""
data = {}
try:
with open('/etc/os-release', 'r') as f:
for line in f:
line = line.strip()
if not line or line.startswith('#') or '=' not in line:
continue
key, value = line.split('=', 1)
value = value.strip().strip('"')
data[key] = value
except FileNotFoundError:
pass
return data
def _collect_tokens(*values):
"""Normalize distro identifiers for comparison"""
tokens = set()
for value in values:
if not value:
continue
normalized = value.replace('"', '').strip().lower()
if not normalized:
continue
# Keep the raw normalized value plus its dashed/underscored variants split
tokens.add(normalized)
delimiters_replaced = normalized.replace('-', ' ').replace('_', ' ')
for part in delimiters_replaced.split():
if part:
tokens.add(part)
return tokens
def _detect_supported_distro():
"""Return distro identifier: 'debian', 'arch', or 'other'"""
if os.path.exists('/etc/debian_version'):
return 'debian'
arch_markers = (
'/etc/arch-release',
'/etc/artix-release',
)
if any(os.path.exists(path) for path in arch_markers):
return 'arch'
os_release = _parse_os_release()
tokens = _collect_tokens(os_release.get('ID'), os_release.get('ID_LIKE'))
if tokens & SUPPORTED_DEBIAN_IDS:
return 'debian'
if tokens & SUPPORTED_ARCH_IDS:
return 'arch'
# Fallback to package manager detection
if shutil.which('apt-get'):
return 'debian'
if shutil.which('pacman'):
return 'arch'
return 'other'
def _prompt_distro_override(detected_distro):
"""Allow user to override detected distro."""
label_map = {
'debian': "Debian-based",
'arch': "Arch-based",
'other': "Unsupported/Other",
}
print("=" * 60)
print(f"Detected distribution: {label_map.get(detected_distro, 'Unknown')}")
print("If this is incorrect, enter one of: debian / arch / other.")
print("Press Enter to accept the detected value.")
override = input("Override distribution (leave blank to keep): ").strip().lower()
if override in ('debian', 'arch', 'other'):
return override
if override:
print(f"Unknown override '{override}'. Using detected value.")
return detected_distro
def _load_saved_distro():
"""Load saved distro preference from config file."""
config_file = os.path.expanduser("~/.zdtt/config.json")
try:
with open(config_file, 'r') as f:
data = json.load(f)
saved_distro = data.get('distro')
if saved_distro in ('debian', 'arch', 'other'):
return saved_distro
except (FileNotFoundError, json.JSONDecodeError, KeyError):
pass
return None
def _save_distro_preference(distro):
"""Save distro preference to config file."""
config_file = os.path.expanduser("~/.zdtt/config.json")
os.makedirs(os.path.dirname(config_file), exist_ok=True)
data = {}
try:
with open(config_file, 'r') as f:
data = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
data = {}
data['distro'] = distro
with open(config_file, 'w') as f:
json.dump(data, f, indent=2)
def check_system_compatibility():
"""Detect supported distributions and warn when unsupported"""
# Check for saved distro preference first
saved_distro = _load_saved_distro()
if saved_distro:
# Use saved preference, skip prompts
return saved_distro
# Check if running on Linux
if sys.platform != 'linux':
print("=" * 60)
print("⚠️ WARNING: ZDTT Terminal is designed for Linux systems")
print(f" Detected platform: {sys.platform}")
print("=" * 60)
print("ZDTT may not work correctly on your system.")
print("Some features may be unavailable or broken.")
print()
response = input("Continue anyway? (yes/no): ").strip().lower()
if response != 'yes':
print("Installation cancelled.")
sys.exit(0)
distro = 'other'
_save_distro_preference(distro)
return distro
# Detect supported distributions
distro = _detect_supported_distro()
if distro not in ('debian', 'arch'):
# Unsupported distribution
print("=" * 60)
print("⚠️ WARNING: Unsupported Distribution Detected")
print("=" * 60)
print("ZDTT Terminal is optimized for Debian-based and Arch Linux systems.")
print()
print("Running on your current system may result in:")
print(" • Some commands may not work as expected")
print(" • Auto-install features may fail")
print(" • Reduced plugin compatibility")
print(" • Package management commands unavailable")
print()
response = input("Continue installation? (yes/no): ").strip().lower()
if response != 'yes':
print("Installation cancelled.")
sys.exit(0)
# Offer override regardless of detection
distro = _prompt_distro_override(distro)
# Save the selected distro preference
_save_distro_preference(distro)
return distro
class ZDTTTerminal:
@@ -106,8 +273,6 @@ class ZDTTTerminal:
self.safe_mode = False # Safe mode flag (no plugins loaded)
self.quarantine_warnings = [] # Store warnings for plugins quarantined at startup
self.trusted_plugins = set() # Plugins allowed to use imports
# Expose STATUS_BAR_COLORS lookup to status bar module
self.STATUS_BAR_COLORS_LOOKUP = lambda: _STATUS_BAR_COLORS_LOOKUP(self)
# Setup logging for plugins
self.setup_logging()
@@ -310,49 +475,272 @@ class ZDTTTerminal:
def display_banner(self):
"""Display the ZDTT ASCII art banner (or custom banner if available)"""
ui_display_banner(self)
print()
# Check terminal size to see if banner will fit
try:
term_size = shutil.get_terminal_size()
# Banner is 44 chars wide and 11 lines tall (including version)
# Add extra space for compatibility warning if needed
min_height = 13 if not self.is_supported else 11
min_width = 44
if term_size.columns < min_width or term_size.lines < min_height:
# Terminal too small, skip banner and just show minimal header
print(f"ZDTT Terminal v{self.version}")
if not self.is_supported:
print("⚠️ Unsupported system - limited support")
print()
return
except Exception:
# If we can't get terminal size, display the banner anyway
pass
# Check for custom banner
if os.path.exists(self.banner_file):
try:
with open(self.banner_file, 'r') as f:
custom_banner = f.read()
# Add version at the bottom if not already present
if '{version}' in custom_banner:
custom_banner = custom_banner.replace('{version}', self.version)
print(custom_banner)
# Show warning for unsupported systems
if not self.is_supported:
self._show_compatibility_warning()
return
except Exception as e:
logging.error(f"Failed to load custom banner: {e}")
# Fall through to default banner
# Default banner
banner = f"""
░█████████ ░███████ ░██████████░██████████
░██ ░██ ░██ ░██ ░██
░██ ░██ ░██ ░██ ░██
░███ ░██ ░██ ░██ ░██
░██ ░██ ░██ ░██ ░██
░██ ░██ ░██ ░██ ░██
░█████████ ░███████ ░██ ░██
ZDTT Terminal v{self.version}
"""
print(banner)
# Show warning for unsupported systems after banner
if not self.is_supported:
self._show_compatibility_warning()
def _show_compatibility_warning(self):
"""Show compatibility warning for unsupported systems"""
from zdtt.ui import _show_compatibility_warning as ui_warn
ui_warn(self)
if self.is_supported:
return
print()
print("⚠️ Running on unsupported system - limited support")
print(" Tested on Debian-based and Arch Linux distributions.")
print()
def initialize_status_bar(self):
"""Reserve the first terminal row and start the status bar thread."""
sb.initialize_status_bar(self)
self._set_scroll_region()
self._start_status_bar_thread()
self._render_status_bar()
def shutdown_status_bar(self):
"""Stop the status bar thread and release terminal state."""
sb.shutdown_status_bar(self)
self.status_bar_stop_event.set()
if self.status_bar_thread and self.status_bar_thread.is_alive():
self.status_bar_thread.join(timeout=0.5)
self.status_bar_thread = None
self._reset_scroll_region()
def _start_status_bar_thread(self):
sb.start_status_bar_thread(self)
if self.status_bar_thread and self.status_bar_thread.is_alive():
return
self.status_bar_stop_event.clear()
self.status_bar_thread = threading.Thread(
target=self._status_bar_loop,
name="ZDTTStatusBar",
daemon=True,
)
self.status_bar_thread.start()
def _status_bar_loop(self):
sb.status_bar_loop(self)
while not self.status_bar_stop_event.is_set():
self._render_status_bar()
if self.status_bar_stop_event.wait(2):
break
def _render_status_bar(self):
"""Render a single-line status bar with branding and time."""
sb.render_status_bar(self)
try:
# Get terminal size first to ensure we don't write beyond bounds
try:
term_size = shutil.get_terminal_size()
max_width = term_size.columns
except Exception:
max_width = 80 # Fallback
bar_text = self._build_status_bar_text()
# Ensure bar_text doesn't exceed terminal width (safety check)
# Count visible characters (approximate - ANSI codes don't count)
# This is a rough check, but better than nothing
if len(bar_text) > max_width * 3: # Allow for ANSI codes (rough estimate)
# Rebuild with safer width
bar_text = self._build_status_bar_text()
sys.stdout.write("\033[s") # Save cursor position
sys.stdout.write("\033[1;1H") # Move to first row, first column
sys.stdout.write("\033[2K") # Clear the entire line
sys.stdout.write("\033[0m") # Reset all attributes
sys.stdout.write(bar_text)
sys.stdout.write("\033[0m") # Ensure reset at end
# Move cursor to end of line to prevent wrapping issues
sys.stdout.write(f"\033[{max_width}G") # Move to column max_width
sys.stdout.write("\033[u") # Restore cursor
sys.stdout.flush()
except Exception:
# Fallback: just skip rendering if there's an error
pass
def _build_status_bar_text(self):
"""Render a single-line status bar with enhanced branding and time."""
return sb.build_status_bar_text(self)
left_text = f"{self.COLOR_BOLD}ZDTT{self.COLOR_RESET} by {self.COLOR_BOLD}ZaneDev{self.COLOR_RESET}"
time_str = datetime.now().strftime("%I:%M %p")
plain_left = "ZDTT by ZaneDev"
plain_time = time_str
try:
# Always get fresh terminal size to handle resizes
term_size = shutil.get_terminal_size()
width = term_size.columns
# Safety: ensure width is at least 1
width = max(1, width)
except Exception:
# Fallback to minimum width if we can't get terminal size
width = max(1, len(plain_left) + len(plain_time) + 6)
# Calculate the minimum content width (plain text only, no ANSI codes)
# Format: " ZDTT by ZaneDev | TIME "
min_content_width = len(plain_left) + len(plain_time) + 5 # 5 = spaces + separator
# Calculate padding to fill the line
if width < min_content_width:
# Terminal too narrow, use minimum padding
padding = 0
else:
padding = width - min_content_width
# Build the content (plain text calculation)
separator = f"{self.COLOR_DIM}{self.COLOR_RESET}"
bar_content = f" {left_text} {' ' * padding}{separator} {self.COLOR_BRIGHT_WHITE}{time_str}{self.COLOR_RESET} "
# Calculate actual display length (plain text only)
actual_display_len = len(plain_left) + len(plain_time) + padding + 5
# Ensure we fill exactly to terminal width (but never exceed it)
if actual_display_len < width:
# Add trailing spaces to fill exactly to width
trailing_spaces = width - actual_display_len
bar_content = bar_content.rstrip() + ' ' * trailing_spaces
elif actual_display_len > width:
# We exceeded width, recalculate with less padding
padding = max(0, width - min_content_width)
bar_content = f" {left_text} {' ' * padding}{separator} {self.COLOR_BRIGHT_WHITE}{time_str}{self.COLOR_RESET} "
actual_display_len = len(plain_left) + len(plain_time) + padding + 5
if actual_display_len < width:
trailing_spaces = width - actual_display_len
bar_content = bar_content.rstrip() + ' ' * trailing_spaces
else:
# Still too wide, trim the time if necessary
if width < len(plain_left) + 10:
# Very narrow terminal, just show minimal content
bar_content = f" {left_text} {separator} {self.COLOR_BRIGHT_WHITE}{time_str[:8]}{self.COLOR_RESET} "
bar_content = bar_content[:width] if len(bar_content) > width else bar_content
# Final safety check: ensure we never exceed terminal width
# This is approximate since ANSI codes don't count, but better than nothing
bg_code, fg_code = STATUS_BAR_COLORS.get(self.status_bar_color, ('44', '97'))
result = f"\033[{bg_code}m\033[{fg_code}m{bar_content}\033[0m"
# If the result is suspiciously long, truncate it
# (rough heuristic: ANSI codes add ~30-50 chars, so if result > width*2, it's probably wrong)
if len(result) > width * 2:
# Emergency fallback: simple status bar
simple_bar = f" ZDTT by ZaneDev | {time_str} "
simple_bar = simple_bar[:width] if len(simple_bar) > width else simple_bar.ljust(width)
result = f"\033[{bg_code}m\033[{fg_code}m{simple_bar}\033[0m"
return result
def _set_scroll_region(self):
"""Reserve the top row for the status bar."""
sb.set_scroll_region(self)
try:
rows = shutil.get_terminal_size().lines
rows = max(rows, 2)
sys.stdout.write(f"\033[2;{rows}r")
sys.stdout.write("\033[1;1H")
sys.stdout.write("\033[2K")
sys.stdout.write("\033[2;1H")
sys.stdout.flush()
self.scroll_region_set = True
except Exception:
self.scroll_region_set = False
def _reset_scroll_region(self):
"""Restore default scrolling behavior."""
sb.reset_scroll_region(self)
if not self.scroll_region_set:
return
sys.stdout.write("\033[r")
sys.stdout.flush()
self.scroll_region_set = False
def _handle_resize(self, signum=None, frame=None):
"""Handle terminal resize event (SIGWINCH)."""
sb.handle_resize(self, signum, frame)
def _spawn_thread(self, target, name):
return threading.Thread(target=target, name=name, daemon=True)
# Use a lock to prevent race conditions
if not self.resize_lock.acquire(blocking=False):
# If we can't acquire the lock immediately, skip this resize
# (another resize is already being handled)
return
try:
# Small delay to let terminal settle after resize
time_module.sleep(0.05)
# Reset scroll region first to clear any corrupted state
self._reset_scroll_region()
# Update scroll region with new terminal size
self._set_scroll_region()
# Clear the status bar line completely before redrawing
try:
sys.stdout.write("\033[1;1H") # Move to first row
sys.stdout.write("\033[2K") # Clear the entire line
sys.stdout.write("\033[0m") # Reset attributes
sys.stdout.flush()
except Exception:
pass
# Force immediate status bar refresh
self._render_status_bar()
# Ensure cursor is in a safe position
try:
term_size = shutil.get_terminal_size()
sys.stdout.write(f"\033[{term_size.lines};1H") # Move to last line, first column
sys.stdout.flush()
except Exception:
pass
except Exception:
# Silently fail if resize handling fails
pass
finally:
self.resize_lock.release()
def setup_readline(self):
"""Setup readline for history and tab completion"""
@@ -412,19 +800,92 @@ class ZDTTTerminal:
return None
def _validate_plugin_ast(self, plugin_code, plugin_name):
# Delegate to plugins module
return plugins_validate_ast(plugin_code, plugin_name)
"""
Validate plugin AST to ensure no top-level code execution.
Only allows: imports, function definitions, class definitions, and docstrings.
"""
try:
tree = ast.parse(plugin_code)
except SyntaxError as e:
raise ValueError(f"Plugin has syntax errors: {e}")
# Check module body directly - this is the most reliable way
if not isinstance(tree, ast.Module):
raise ValueError("Plugin must be a valid Python module")
for stmt in tree.body:
# Allow imports
if isinstance(stmt, (ast.Import, ast.ImportFrom)):
continue
# Allow function definitions
if isinstance(stmt, (ast.FunctionDef, ast.AsyncFunctionDef)):
continue
# Allow class definitions
if isinstance(stmt, ast.ClassDef):
continue
# Allow docstrings (Expr with string constant)
if isinstance(stmt, ast.Expr):
# Check if it's a string literal (docstring)
if isinstance(stmt.value, (ast.Constant, ast.Str)):
# For Python 3.8+, use ast.Constant; for older versions, use ast.Str
if isinstance(stmt.value, ast.Constant):
if isinstance(stmt.value.value, str):
continue
elif isinstance(stmt.value, ast.Str):
continue
# Anything else is forbidden (assignments, function calls, loops, etc.)
raise ValueError(
f"Plugin contains forbidden top-level statement: {stmt.__class__.__name__}. "
"Plugins can only contain imports, functions, classes, and docstrings. "
"No top-level code execution is allowed."
)
return True
def _move_to_quarantine(self, plugin_file, reason):
# Delegate to plugins module
path = plugins_move_to_quarantine(plugin_file, self.quarantine_dir, logging)
if path:
logging.warning(f"Reason: {reason}")
return path
"""Move a plugin file to quarantine directory and log the reason."""
plugin_name = os.path.basename(plugin_file)
os.makedirs(self.quarantine_dir, exist_ok=True)
# Create unique filename if already exists
quarantine_path = os.path.join(self.quarantine_dir, plugin_name)
counter = 1
while os.path.exists(quarantine_path):
name, ext = os.path.splitext(plugin_name)
quarantine_path = os.path.join(self.quarantine_dir, f"{name}_{counter}{ext}")
counter += 1
try:
shutil.move(plugin_file, quarantine_path)
logging.warning(f"Plugin '{plugin_name}' quarantined: {reason}")
logging.warning(f"Moved to: {quarantine_path}")
return quarantine_path
except Exception as e:
logging.error(f"Failed to quarantine plugin '{plugin_name}': {e}")
return None
def _validate_plugin_commands(self, plugin_commands, plugin_name):
# Delegate to plugins module
return plugins_validate_commands(plugin_commands, plugin_name, PROTECTED_COMMANDS)
"""Validate that plugin commands don't override protected commands."""
violations = []
for cmd_name in plugin_commands.keys():
if cmd_name in PROTECTED_COMMANDS:
violations.append(cmd_name)
if violations:
raise ValueError(
f"Plugin attempted to override protected commands: {', '.join(violations)}. "
"This is a security violation and the plugin has been quarantined."
)
# Validate that all values are callable
for cmd_name, cmd_func in plugin_commands.items():
if not callable(cmd_func):
raise ValueError(
f"Plugin command '{cmd_name}' is not callable. "
"All commands must be functions."
)
return True
def load_plugins(self):
"""Load plugin commands from the plugins directory with security validation."""
@@ -667,7 +1128,32 @@ class ZDTTTerminal:
def get_prompt(self):
"""Return the custom prompt string with enhanced colors"""
return ui_get_prompt(self)
# Show current directory in prompt
cwd = os.getcwd()
# Show ~ for home directory
home = os.path.expanduser("~")
if cwd.startswith(home):
display_path = "~" + cwd[len(home):]
else:
display_path = cwd
# Wrap ANSI codes in \001 and \002 so readline knows they're non-printable
# This fixes line wrapping issues with long commands
RL_PROMPT_START = '\001'
RL_PROMPT_END = '\002'
# Create enhanced colorized prompt with gradient-like effect
# [username in bright green @ ZDTT in bright cyan path in bright blue]=>
prompt = (f"{RL_PROMPT_START}{self.COLOR_BRIGHT_CYAN}{RL_PROMPT_END}┌─{RL_PROMPT_START}{self.COLOR_RESET}{RL_PROMPT_END}"
f"[{RL_PROMPT_START}{self.COLOR_BRIGHT_GREEN}{RL_PROMPT_END}{self.username}"
f"{RL_PROMPT_START}{self.COLOR_RESET}{RL_PROMPT_END}"
f"{RL_PROMPT_START}{self.COLOR_BRIGHT_WHITE}{RL_PROMPT_END}@{RL_PROMPT_START}{self.COLOR_RESET}{RL_PROMPT_END}"
f"{RL_PROMPT_START}{self.COLOR_BRIGHT_CYAN}{RL_PROMPT_END}ZDTT{RL_PROMPT_START}{self.COLOR_RESET}{RL_PROMPT_END} "
f"{RL_PROMPT_START}{self.COLOR_BRIGHT_BLUE}{RL_PROMPT_END}{display_path}"
f"{RL_PROMPT_START}{self.COLOR_RESET}{RL_PROMPT_END}]"
f"{RL_PROMPT_START}{self.COLOR_BRIGHT_CYAN}{RL_PROMPT_END}{RL_PROMPT_START}{self.COLOR_RESET}{RL_PROMPT_END}\n"
f"{RL_PROMPT_START}{self.COLOR_BRIGHT_CYAN}{RL_PROMPT_END}└─{RL_PROMPT_START}{self.COLOR_BRIGHT_MAGENTA}{RL_PROMPT_END}{RL_PROMPT_START}{self.COLOR_RESET}{RL_PROMPT_END} ")
return prompt
def cmd_help(self, args):
"""Display available commands with enhanced formatting"""
@@ -1463,7 +1949,93 @@ class ZDTTTerminal:
def _execute_system_command(self, command):
"""Execute a system command with real-time I/O streaming."""
shell_execute(self, command)
# Temporarily disable status bar updates during command execution
status_bar_was_running = self.status_bar_thread and self.status_bar_thread.is_alive()
try:
# Start the process with direct stdin/stdout/stderr
process = subprocess.Popen(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, # Merge stderr into stdout
stdin=sys.stdin, # Direct stdin passthrough
bufsize=1, # Line buffered
text=True,
cwd=self.current_dir
)
# Buffer for early output detection
early_output = []
start_time = time_module.time()
check_timeout = 0.1 # 0.1 seconds
hide_output = False
output_buffer = []
# Read output in real-time
try:
while True:
# Read character by character for early detection
char = process.stdout.read(1)
if not char:
if process.poll() is not None:
break
time_module.sleep(0.01)
continue
# Check for "command not found" in first 0.1 seconds
if time_module.time() - start_time < check_timeout:
early_output.append(char)
combined = ''.join(early_output).lower()
if 'command not found' in combined or 'not found:' in combined:
hide_output = True
# Consume remaining output silently
while process.poll() is None:
process.stdout.read(1)
break
# Buffer output
output_buffer.append(char)
# If we have a complete line or enough chars, flush
if char == '\n' or len(output_buffer) >= 1024:
if not hide_output:
sys.stdout.write(''.join(output_buffer))
sys.stdout.flush()
output_buffer.clear()
# Flush remaining buffer
if output_buffer and not hide_output:
sys.stdout.write(''.join(output_buffer))
sys.stdout.flush()
# Wait for process to finish
process.wait()
except BrokenPipeError:
# Process closed stdout
pass
except KeyboardInterrupt:
# Handle Ctrl+C
try:
if 'process' in locals():
process.terminate()
process.wait(timeout=1)
except Exception:
try:
if 'process' in locals():
process.kill()
except Exception:
pass
print("\n^C")
except Exception as e:
if not hide_output:
print(f"{self.COLOR_ERROR}Error executing command: {e}{self.COLOR_RESET}")
finally:
# Restore status bar if it was running
if status_bar_was_running:
self._render_status_bar()
def execute_command(self, command_line):
"""Parse and execute a command"""

View File

@@ -1 +1 @@
0.1.2.a.9
0.1.2.b

View File

@@ -1,4 +0,0 @@
# ZDTT package

Binary file not shown.

View File

@@ -1,166 +0,0 @@
"""
System detection and persistent configuration helpers for ZDTT.
"""
import os
import sys
import shutil
import json
SUPPORTED_DEBIAN_IDS = {
'debian', 'ubuntu', 'linuxmint', 'mint', 'pop', 'pop-os', 'pop_os',
'elementary', 'zorin', 'kali', 'parrot', 'mx', 'mx-linux', 'deepin',
'peppermint', 'raspbian', 'neon',
}
SUPPORTED_ARCH_IDS = {
'arch', 'archlinux', 'manjaro', 'endeavouros', 'endeavour', 'arcolinux',
'garuda', 'artix', 'blackarch', 'chakra',
}
def _parse_os_release():
data = {}
try:
with open('/etc/os-release', 'r') as f:
for line in f:
line = line.strip()
if not line or line.startswith('#') or '=' not in line:
continue
key, value = line.split('=', 1)
value = value.strip().strip('"')
data[key] = value
except FileNotFoundError:
pass
return data
def _collect_tokens(*values):
tokens = set()
for value in values:
if not value:
continue
normalized = value.replace('"', '').strip().lower()
if not normalized:
continue
tokens.add(normalized)
delimiters_replaced = normalized.replace('-', ' ').replace('_', ' ')
for part in delimiters_replaced.split():
if part:
tokens.add(part)
return tokens
def _detect_supported_distro():
if os.path.exists('/etc/debian_version'):
return 'debian'
arch_markers = ('/etc/arch-release', '/etc/artix-release')
if any(os.path.exists(path) for path in arch_markers):
return 'arch'
os_release = _parse_os_release()
tokens = _collect_tokens(os_release.get('ID'), os_release.get('ID_LIKE'))
if tokens & SUPPORTED_DEBIAN_IDS:
return 'debian'
if tokens & SUPPORTED_ARCH_IDS:
return 'arch'
if shutil.which('apt-get'):
return 'debian'
if shutil.which('pacman'):
return 'arch'
return 'other'
def _prompt_distro_override(detected_distro):
label_map = {
'debian': "Debian-based",
'arch': "Arch-based",
'other': "Unsupported/Other",
}
print("=" * 60)
print(f"Detected distribution: {label_map.get(detected_distro, 'Unknown')}")
print("If this is incorrect, enter one of: debian / arch / other.")
print("Press Enter to accept the detected value.")
override = input("Override distribution (leave blank to keep): ").strip().lower()
if override in ('debian', 'arch', 'other'):
return override
if override:
print(f"Unknown override '{override}'. Using detected value.")
return detected_distro
def _load_saved_distro():
config_file = os.path.expanduser("~/.zdtt/config.json")
try:
with open(config_file, 'r') as f:
data = json.load(f)
saved_distro = data.get('distro')
if saved_distro in ('debian', 'arch', 'other'):
return saved_distro
except (FileNotFoundError, json.JSONDecodeError, KeyError):
pass
return None
def _save_distro_preference(distro: str):
config_file = os.path.expanduser("~/.zdtt/config.json")
os.makedirs(os.path.dirname(config_file), exist_ok=True)
data = {}
try:
with open(config_file, 'r') as f:
data = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
data = {}
data['distro'] = distro
with open(config_file, 'w') as f:
json.dump(data, f, indent=2)
def check_system_compatibility():
"""Detect supported distributions and warn when unsupported. Returns 'debian' | 'arch' | 'other'."""
saved_distro = _load_saved_distro()
if saved_distro:
return saved_distro
if sys.platform != 'linux':
print("=" * 60)
print("⚠️ WARNING: ZDTT Terminal is designed for Linux systems")
print(f" Detected platform: {sys.platform}")
print("=" * 60)
print("ZDTT may not work correctly on your system.")
print("Some features may be unavailable or broken.")
print()
response = input("Continue anyway? (yes/no): ").strip().lower()
if response != 'yes':
print("Installation cancelled.")
sys.exit(0)
distro = 'other'
_save_distro_preference(distro)
return distro
distro = _detect_supported_distro()
if distro not in ('debian', 'arch'):
print("=" * 60)
print("⚠️ WARNING: Unsupported Distribution Detected")
print("=" * 60)
print("ZDTT Terminal is optimized for Debian-based and Arch Linux systems.")
print()
print("Running on your current system may result in:")
print(" • Some commands may not work as expected")
print(" • Auto-install features may fail")
print(" • Reduced plugin compatibility")
print(" • Package management commands unavailable")
print()
response = input("Continue installation? (yes/no): ").strip().lower()
if response != 'yes':
print("Installation cancelled.")
sys.exit(0)
distro = _prompt_distro_override(distro)
_save_distro_preference(distro)
return distro

View File

@@ -1,106 +0,0 @@
"""
Plugin utilities for ZDTT: AST validation, quarantine, and command validation.
"""
import os
import shutil
import ast
from typing import Dict, Callable, Iterable, Optional
# Protected command names that plugins cannot override
PROTECTED_COMMANDS = {
'ssh', 'sudo', 'su', 'cp', 'mv', 'rm', 'ls', 'cat', 'chmod', 'chown',
'history', 'zps', 'zdtt', 'pip', 'python', 'python3', 'curl', 'wget'
}
def validate_plugin_ast(plugin_code: str, plugin_name: str) -> bool:
"""
Validate plugin AST to ensure no top-level code execution.
Only allows: imports, function definitions, class definitions, and docstrings.
Raises ValueError on violation.
"""
try:
tree = ast.parse(plugin_code)
except SyntaxError as e:
raise ValueError(f"Plugin has syntax errors: {e}")
if not isinstance(tree, ast.Module):
raise ValueError("Plugin must be a valid Python module")
for stmt in tree.body:
if isinstance(stmt, (ast.Import, ast.ImportFrom)):
continue
if isinstance(stmt, (ast.FunctionDef, ast.AsyncFunctionDef)):
continue
if isinstance(stmt, ast.ClassDef):
continue
if isinstance(stmt, ast.Expr):
# Allow docstring literals
if isinstance(stmt.value, (ast.Constant, ast.Str)):
if isinstance(stmt.value, ast.Constant):
if isinstance(stmt.value.value, str):
continue
else:
# ast.Str case (older Python)
continue
raise ValueError(
f"Plugin contains forbidden top-level statement: {stmt.__class__.__name__}. "
"Plugins can only contain imports, functions, classes, and docstrings. "
"No top-level code execution is allowed."
)
return True
def move_to_quarantine(plugin_file: str, quarantine_dir: str, logger) -> Optional[str]:
"""
Move a plugin file to quarantine directory and log the reason via caller.
Returns the final quarantine path or None on failure.
"""
plugin_name = os.path.basename(plugin_file)
os.makedirs(quarantine_dir, exist_ok=True)
quarantine_path = os.path.join(quarantine_dir, plugin_name)
counter = 1
while os.path.exists(quarantine_path):
name, ext = os.path.splitext(plugin_name)
quarantine_path = os.path.join(quarantine_dir, f"{name}_{counter}{ext}")
counter += 1
try:
shutil.move(plugin_file, quarantine_path)
logger.warning(f"Plugin '{plugin_name}' quarantined")
logger.warning(f"Moved to: {quarantine_path}")
return quarantine_path
except Exception as e:
logger.error(f"Failed to quarantine plugin '{plugin_name}': {e}")
return None
def validate_plugin_commands(
plugin_commands: Dict[str, Callable],
plugin_name: str,
protected_commands: Iterable[str] = PROTECTED_COMMANDS,
) -> bool:
"""
Ensure plugins do not override protected commands and values are callable.
Raises ValueError on violation.
"""
violations = [cmd for cmd in plugin_commands.keys() if cmd in protected_commands]
if violations:
raise ValueError(
f"Plugin attempted to override protected commands: {', '.join(violations)}. "
"This is a security violation and the plugin has been quarantined."
)
for cmd_name, cmd_func in plugin_commands.items():
if not callable(cmd_func):
raise ValueError(
f"Plugin command '{cmd_name}' is not callable. All commands must be functions."
)
return True

View File

@@ -1,82 +0,0 @@
"""
System shell command execution utilities for ZDTT.
"""
import sys
import subprocess
import time as time_module
def execute_system_command(terminal, command: str):
"""Execute a system command with real-time I/O streaming."""
status_bar_was_running = terminal.status_bar_thread and terminal.status_bar_thread.is_alive()
try:
process = subprocess.Popen(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
stdin=sys.stdin,
bufsize=1,
text=True,
cwd=terminal.current_dir
)
early_output = []
start_time = time_module.time()
check_timeout = 0.1
hide_output = False
output_buffer = []
try:
while True:
char = process.stdout.read(1)
if not char:
if process.poll() is not None:
break
time_module.sleep(0.01)
continue
if time_module.time() - start_time < check_timeout:
early_output.append(char)
combined = ''.join(early_output).lower()
if 'command not found' in combined or 'not found:' in combined:
hide_output = True
while process.poll() is None:
process.stdout.read(1)
break
output_buffer.append(char)
if char == '\n' or len(output_buffer) >= 1024:
if not hide_output:
sys.stdout.write(''.join(output_buffer))
sys.stdout.flush()
output_buffer.clear()
if output_buffer and not hide_output:
sys.stdout.write(''.join(output_buffer))
sys.stdout.flush()
process.wait()
except BrokenPipeError:
pass
except KeyboardInterrupt:
try:
if 'process' in locals():
process.terminate()
process.wait(timeout=1)
except Exception:
try:
if 'process' in locals():
process.kill()
except Exception:
pass
print("\n^C")
except Exception as e:
if not locals().get('hide_output', False):
print(f"{terminal.COLOR_ERROR}Error executing command: {e}{terminal.COLOR_RESET}")
finally:
if status_bar_was_running:
terminal._render_status_bar()

View File

@@ -1,154 +0,0 @@
"""
Status bar, scroll region, and resize handling utilities for ZDTT.
All functions operate on the provided terminal instance.
"""
import sys
import shutil
from datetime import datetime
def set_scroll_region(terminal):
try:
rows = shutil.get_terminal_size().lines
rows = max(rows, 2)
sys.stdout.write(f"\033[2;{rows}r")
sys.stdout.write("\033[1;1H")
sys.stdout.write("\033[2K")
sys.stdout.write("\033[2;1H")
sys.stdout.flush()
terminal.scroll_region_set = True
except Exception:
terminal.scroll_region_set = False
def reset_scroll_region(terminal):
if not terminal.scroll_region_set:
return
sys.stdout.write("\033[r")
sys.stdout.flush()
terminal.scroll_region_set = False
def build_status_bar_text(terminal):
left_text = f"{terminal.COLOR_BOLD}ZDTT{terminal.COLOR_RESET} by {terminal.COLOR_BOLD}ZaneDev{terminal.COLOR_RESET}"
time_str = datetime.now().strftime("%I:%M %p")
plain_left = "ZDTT by ZaneDev"
plain_time = time_str
try:
term_size = shutil.get_terminal_size()
width = max(1, term_size.columns)
except Exception:
width = max(1, len(plain_left) + len(plain_time) + 6)
min_content_width = len(plain_left) + len(plain_time) + 5
padding = 0 if width < min_content_width else width - min_content_width
separator = f"{terminal.COLOR_DIM}{terminal.COLOR_RESET}"
bar_content = f" {left_text} {' ' * padding}{separator} {terminal.COLOR_BRIGHT_WHITE}{time_str}{terminal.COLOR_RESET} "
actual_display_len = len(plain_left) + len(plain_time) + padding + 5
if actual_display_len < width:
trailing_spaces = width - actual_display_len
bar_content = bar_content.rstrip() + ' ' * trailing_spaces
elif actual_display_len > width:
padding = max(0, width - min_content_width)
bar_content = f" {left_text} {' ' * padding}{separator} {terminal.COLOR_BRIGHT_WHITE}{time_str}{terminal.COLOR_RESET} "
actual_display_len = len(plain_left) + len(plain_time) + padding + 5
if actual_display_len < width:
trailing_spaces = width - actual_display_len
bar_content = bar_content.rstrip() + ' ' * trailing_spaces
else:
if width < len(plain_left) + 10:
bar_content = f" {left_text} {separator} {terminal.COLOR_BRIGHT_WHITE}{time_str[:8]}{terminal.COLOR_RESET} "
bar_content = bar_content[:width] if len(bar_content) > width else bar_content
bg_code, fg_code = terminal.STATUS_BAR_COLORS_LOOKUP()
result = f"\033[{bg_code}m\033[{fg_code}m{bar_content}\033[0m"
if len(result) > width * 2:
simple_bar = f" ZDTT by ZaneDev | {time_str} "
simple_bar = simple_bar[:width] if len(simple_bar) > width else simple_bar.ljust(width)
result = f"\033[{bg_code}m\033[{fg_code}m{simple_bar}\033[0m"
return result
def render_status_bar(terminal):
try:
try:
term_size = shutil.get_terminal_size()
max_width = term_size.columns
except Exception:
max_width = 80
bar_text = build_status_bar_text(terminal)
if len(bar_text) > max_width * 3:
bar_text = build_status_bar_text(terminal)
sys.stdout.write("\033[s")
sys.stdout.write("\033[1;1H")
sys.stdout.write("\033[2K")
sys.stdout.write("\033[0m")
sys.stdout.write(bar_text)
sys.stdout.write("\033[0m")
sys.stdout.write(f"\033[{max_width}G")
sys.stdout.write("\033[u")
sys.stdout.flush()
except Exception:
pass
def status_bar_loop(terminal):
while not terminal.status_bar_stop_event.is_set():
render_status_bar(terminal)
if terminal.status_bar_stop_event.wait(2):
break
def start_status_bar_thread(terminal):
if terminal.status_bar_thread and terminal.status_bar_thread.is_alive():
return
terminal.status_bar_stop_event.clear()
terminal.status_bar_thread = terminal._spawn_thread(target=lambda: status_bar_loop(terminal), name="ZDTTStatusBar")
terminal.status_bar_thread.start()
def initialize_status_bar(terminal):
set_scroll_region(terminal)
start_status_bar_thread(terminal)
render_status_bar(terminal)
def shutdown_status_bar(terminal):
terminal.status_bar_stop_event.set()
if terminal.status_bar_thread and terminal.status_bar_thread.is_alive():
terminal.status_bar_thread.join(timeout=0.5)
terminal.status_bar_thread = None
reset_scroll_region(terminal)
def handle_resize(terminal, signum=None, frame=None):
if not terminal.resize_lock.acquire(blocking=False):
return
try:
import time as time_module
time_module.sleep(0.05)
reset_scroll_region(terminal)
set_scroll_region(terminal)
try:
sys.stdout.write("\033[1;1H")
sys.stdout.write("\033[2K")
sys.stdout.write("\033[0m")
sys.stdout.flush()
except Exception:
pass
render_status_bar(terminal)
try:
term_size = shutil.get_terminal_size()
sys.stdout.write(f"\033[{term_size.lines};1H")
sys.stdout.flush()
except Exception:
pass
except Exception:
pass
finally:
terminal.resize_lock.release()

View File

@@ -1,85 +0,0 @@
"""
UI helpers for ZDTT: banner, compatibility warning, and prompt.
"""
import os
import shutil
def display_banner(terminal):
print()
try:
term_size = shutil.get_terminal_size()
min_height = 13 if not terminal.is_supported else 11
min_width = 44
if term_size.columns < min_width or term_size.lines < min_height:
print(f"ZDTT Terminal v{terminal.version}")
if not terminal.is_supported:
_show_compatibility_warning(terminal)
print()
return
except Exception:
pass
if os.path.exists(terminal.banner_file):
try:
with open(terminal.banner_file, 'r') as f:
custom_banner = f.read()
if '{version}' in custom_banner:
custom_banner = custom_banner.replace('{version}', terminal.version)
print(custom_banner)
if not terminal.is_supported:
_show_compatibility_warning(terminal)
return
except Exception as e:
import logging
logging.error(f"Failed to load custom banner: {e}")
banner = f"""
░█████████ ░███████ ░██████████░██████████
░██ ░██ ░██ ░██ ░██
░██ ░██ ░██ ░██ ░██
░███ ░██ ░██ ░██ ░██
░██ ░██ ░██ ░██ ░██
░██ ░██ ░██ ░██ ░██
░█████████ ░███████ ░██ ░██
ZDTT Terminal v{terminal.version}
"""
print(banner)
if not terminal.is_supported:
_show_compatibility_warning(terminal)
def _show_compatibility_warning(terminal):
if terminal.is_supported:
return
print()
print("⚠️ Running on unsupported system - limited support")
print(" Tested on Debian-based and Arch Linux distributions.")
print()
def get_prompt(terminal):
cwd = os.getcwd()
home = os.path.expanduser("~")
if cwd.startswith(home):
display_path = "~" + cwd[len(home):]
else:
display_path = cwd
RL_PROMPT_START = '\001'
RL_PROMPT_END = '\002'
prompt = (f"{RL_PROMPT_START}{terminal.COLOR_BRIGHT_CYAN}{RL_PROMPT_END}┌─{RL_PROMPT_START}{terminal.COLOR_RESET}{RL_PROMPT_END}"
f"[{RL_PROMPT_START}{terminal.COLOR_BRIGHT_GREEN}{RL_PROMPT_END}{terminal.username}"
f"{RL_PROMPT_START}{terminal.COLOR_RESET}{RL_PROMPT_END}"
f"{RL_PROMPT_START}{terminal.COLOR_BRIGHT_WHITE}{RL_PROMPT_END}@{RL_PROMPT_START}{terminal.COLOR_RESET}{RL_PROMPT_END}"
f"{RL_PROMPT_START}{terminal.COLOR_BRIGHT_CYAN}{RL_PROMPT_END}ZDTT{RL_PROMPT_START}{terminal.COLOR_RESET}{RL_PROMPT_END} "
f"{RL_PROMPT_START}{terminal.COLOR_BRIGHT_BLUE}{RL_PROMPT_END}{display_path}"
f"{RL_PROMPT_START}{terminal.COLOR_RESET}{RL_PROMPT_END}]"
f"{RL_PROMPT_START}{terminal.COLOR_BRIGHT_CYAN}{RL_PROMPT_END}{RL_PROMPT_START}{terminal.COLOR_RESET}{RL_PROMPT_END}\n"
f"{RL_PROMPT_START}{terminal.COLOR_BRIGHT_CYAN}{RL_PROMPT_END}└─{RL_PROMPT_START}{terminal.COLOR_BRIGHT_MAGENTA}{RL_PROMPT_END}{RL_PROMPT_START}{terminal.COLOR_RESET}{RL_PROMPT_END} ")
return prompt