Refactor terminal command execution and update help text
- Replaced the -oszdtt flag with an auto shell fallback for unknown commands. - Introduced a new method for executing system commands with real-time I/O streaming. - Enhanced error handling for command execution and improved user feedback.
This commit is contained in:
109
terminal.py
109
terminal.py
@@ -923,8 +923,8 @@ ZDTT Terminal v{self.version}
|
||||
print(f"{self.COLOR_BRIGHT_MAGENTA}{self.COLOR_BOLD}Features:{self.COLOR_RESET}")
|
||||
print(f" {self.COLOR_BRIGHT_YELLOW}↑/↓ arrows{self.COLOR_RESET} - Navigate command history")
|
||||
print(f" {self.COLOR_BRIGHT_YELLOW}Tab{self.COLOR_RESET} - Auto-complete commands/files")
|
||||
print(f" {self.COLOR_BRIGHT_YELLOW}-oszdtt flag{self.COLOR_RESET} - Run any command in system shell")
|
||||
print(f" {self.COLOR_DIM}Example: htop -oszdtt{self.COLOR_RESET}")
|
||||
print(f" {self.COLOR_BRIGHT_YELLOW}Auto shell fallback{self.COLOR_RESET} - Unknown commands run in system shell")
|
||||
print(f" {self.COLOR_DIM}Example: htop (auto-runs in shell){self.COLOR_RESET}")
|
||||
print()
|
||||
|
||||
def cmd_clear(self, args):
|
||||
@@ -1652,6 +1652,96 @@ ZDTT Terminal v{self.version}
|
||||
|
||||
print("Unable to locate the ZDTT updater.")
|
||||
print("Re-run the installer script or use 'zdtt update' from your shell if available.")
|
||||
|
||||
def _execute_system_command(self, command):
|
||||
"""Execute a system command with real-time I/O streaming."""
|
||||
# Temporarily disable status bar updates during command execution
|
||||
status_bar_was_running = self.status_bar_thread and self.status_bar_thread.is_alive()
|
||||
|
||||
try:
|
||||
# Start the process with direct stdin/stdout/stderr
|
||||
process = subprocess.Popen(
|
||||
command,
|
||||
shell=True,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT, # Merge stderr into stdout
|
||||
stdin=sys.stdin, # Direct stdin passthrough
|
||||
bufsize=1, # Line buffered
|
||||
text=True,
|
||||
cwd=self.current_dir
|
||||
)
|
||||
|
||||
# Buffer for early output detection
|
||||
early_output = []
|
||||
start_time = time_module.time()
|
||||
check_timeout = 0.1 # 0.1 seconds
|
||||
hide_output = False
|
||||
output_buffer = []
|
||||
|
||||
# Read output in real-time
|
||||
try:
|
||||
while True:
|
||||
# Read character by character for early detection
|
||||
char = process.stdout.read(1)
|
||||
if not char:
|
||||
if process.poll() is not None:
|
||||
break
|
||||
time_module.sleep(0.01)
|
||||
continue
|
||||
|
||||
# Check for "command not found" in first 0.1 seconds
|
||||
if time_module.time() - start_time < check_timeout:
|
||||
early_output.append(char)
|
||||
combined = ''.join(early_output).lower()
|
||||
if 'command not found' in combined or 'not found:' in combined:
|
||||
hide_output = True
|
||||
# Consume remaining output silently
|
||||
while process.poll() is None:
|
||||
process.stdout.read(1)
|
||||
break
|
||||
|
||||
# Buffer output
|
||||
output_buffer.append(char)
|
||||
|
||||
# If we have a complete line or enough chars, flush
|
||||
if char == '\n' or len(output_buffer) >= 1024:
|
||||
if not hide_output:
|
||||
sys.stdout.write(''.join(output_buffer))
|
||||
sys.stdout.flush()
|
||||
output_buffer.clear()
|
||||
|
||||
# Flush remaining buffer
|
||||
if output_buffer and not hide_output:
|
||||
sys.stdout.write(''.join(output_buffer))
|
||||
sys.stdout.flush()
|
||||
|
||||
# Wait for process to finish
|
||||
process.wait()
|
||||
|
||||
except BrokenPipeError:
|
||||
# Process closed stdout
|
||||
pass
|
||||
|
||||
except KeyboardInterrupt:
|
||||
# Handle Ctrl+C
|
||||
try:
|
||||
if 'process' in locals():
|
||||
process.terminate()
|
||||
process.wait(timeout=1)
|
||||
except Exception:
|
||||
try:
|
||||
if 'process' in locals():
|
||||
process.kill()
|
||||
except Exception:
|
||||
pass
|
||||
print("\n^C")
|
||||
except Exception as e:
|
||||
if not hide_output:
|
||||
print(f"{self.COLOR_ERROR}Error executing command: {e}{self.COLOR_RESET}")
|
||||
finally:
|
||||
# Restore status bar if it was running
|
||||
if status_bar_was_running:
|
||||
self._render_status_bar()
|
||||
|
||||
def execute_command(self, command_line):
|
||||
"""Parse and execute a command"""
|
||||
@@ -1661,18 +1751,12 @@ ZDTT Terminal v{self.version}
|
||||
# Expand aliases first
|
||||
command_line = self.expand_aliases(command_line)
|
||||
|
||||
# Check for -oszdtt flag (Outside ZDTT)
|
||||
# Check for -oszdtt flag (Outside ZDTT) - still supported for explicit shell execution
|
||||
if '-oszdtt' in command_line:
|
||||
# Remove the -oszdtt flag and execute as system command
|
||||
system_command = command_line.replace('-oszdtt', '').strip()
|
||||
if system_command:
|
||||
try:
|
||||
result = os.system(system_command)
|
||||
# os.system returns the exit code
|
||||
if result != 0:
|
||||
pass # Command already displayed its error
|
||||
except Exception as e:
|
||||
print(f"Error executing command: {e}")
|
||||
self._execute_system_command(system_command)
|
||||
else:
|
||||
print("No command specified with -oszdtt flag")
|
||||
return
|
||||
@@ -1692,9 +1776,8 @@ ZDTT Terminal v{self.version}
|
||||
if cmd in self.commands:
|
||||
self.commands[cmd](args)
|
||||
else:
|
||||
print(f"{self.COLOR_ERROR}Command not found: {self.COLOR_BRIGHT_RED}{cmd}{self.COLOR_RESET}")
|
||||
print(f"Type {self.COLOR_BRIGHT_GREEN}'help'{self.COLOR_RESET} for available commands.")
|
||||
print(f"{self.COLOR_DIM}Tip: Use -oszdtt flag to run system commands{self.COLOR_RESET}")
|
||||
# Command not found in ZDTT - automatically run in shell
|
||||
self._execute_system_command(command_line)
|
||||
|
||||
def run(self):
|
||||
"""Main terminal loop"""
|
||||
|
||||
Reference in New Issue
Block a user