189 lines
6.5 KiB
Python
189 lines
6.5 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
"""
|
||
|
|
voice_cloak.py — Real-time mic → Whisper → TTS → virtual sink
|
||
|
|
Speaks each phrase as it's transcribed, not after a long silence.
|
||
|
|
|
||
|
|
Requirements:
|
||
|
|
pip install faster-whisper sounddevice numpy edge-tts
|
||
|
|
pactl / pw-cli (PulseAudio/Pipewire)
|
||
|
|
|
||
|
|
First run: select "VoiceCloakMic" as your mic in Discord/whatever.
|
||
|
|
"""
|
||
|
|
|
||
|
|
import asyncio
|
||
|
|
import subprocess
|
||
|
|
import tempfile
|
||
|
|
import threading
|
||
|
|
import queue
|
||
|
|
import sys
|
||
|
|
import time
|
||
|
|
import os
|
||
|
|
import numpy as np
|
||
|
|
import sounddevice as sd
|
||
|
|
from faster_whisper import WhisperModel
|
||
|
|
|
||
|
|
# ── Config ────────────────────────────────────────────────────────────────────
|
||
|
|
SAMPLE_RATE = 16000
|
||
|
|
CHANNELS = 1
|
||
|
|
BLOCK_SECONDS = 0.3 # smaller = more responsive
|
||
|
|
SILENCE_THRESH = 0.012 # RMS threshold for silence
|
||
|
|
SILENCE_CHUNKS = 5 # silence chunks before flushing (~1.5s)
|
||
|
|
MAX_BUFFER_CHUNKS = 60 # hard cap ~18s
|
||
|
|
WHISPER_MODEL = "base.en" # tiny.en / base.en / small.en
|
||
|
|
TTS_VOICE = "en-US-RogerNeural"
|
||
|
|
SINK_NAME = "VoiceCloak"
|
||
|
|
VIRTUAL_MIC = "VoiceCloakMic"
|
||
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
audio_queue: queue.Queue = queue.Queue()
|
||
|
|
tts_queue: queue.Queue = queue.Queue()
|
||
|
|
stop_event = threading.Event()
|
||
|
|
|
||
|
|
|
||
|
|
# ── Virtual sink ──────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def run(cmd: str) -> str:
|
||
|
|
return subprocess.run(cmd, shell=True, capture_output=True, text=True).stdout.strip()
|
||
|
|
|
||
|
|
def setup_virtual_sink():
|
||
|
|
if SINK_NAME in run("pactl list short sinks"):
|
||
|
|
print(f"[sink] {SINK_NAME} already exists")
|
||
|
|
return
|
||
|
|
print(f"[sink] Creating virtual sink '{SINK_NAME}'...")
|
||
|
|
mod1 = run(f"pactl load-module module-null-sink sink_name={SINK_NAME} sink_properties=device.description={SINK_NAME}")
|
||
|
|
mod2 = run(f"pactl load-module module-virtual-source source_name={VIRTUAL_MIC} master={SINK_NAME}.monitor")
|
||
|
|
if mod1 and mod2:
|
||
|
|
print(f"[sink] ✓ Ready. Set '{VIRTUAL_MIC}' as your mic in apps.")
|
||
|
|
else:
|
||
|
|
print("[sink] ✗ Failed. Running PulseAudio/Pipewire?")
|
||
|
|
sys.exit(1)
|
||
|
|
|
||
|
|
|
||
|
|
# ── Audio capture ─────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def audio_callback(indata, frames, time_info, status):
|
||
|
|
if status:
|
||
|
|
print(f"[audio] {status}", file=sys.stderr)
|
||
|
|
audio_queue.put(indata.copy())
|
||
|
|
|
||
|
|
def capture_thread():
|
||
|
|
with sd.InputStream(
|
||
|
|
samplerate=SAMPLE_RATE,
|
||
|
|
channels=CHANNELS,
|
||
|
|
dtype="float32",
|
||
|
|
blocksize=int(SAMPLE_RATE * BLOCK_SECONDS),
|
||
|
|
callback=audio_callback,
|
||
|
|
):
|
||
|
|
print("[mic] Listening... (Ctrl+C to stop)")
|
||
|
|
while not stop_event.is_set():
|
||
|
|
time.sleep(0.1)
|
||
|
|
|
||
|
|
|
||
|
|
# ── Transcribe loop ───────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def transcribe_loop(model: WhisperModel):
|
||
|
|
buffer = []
|
||
|
|
silent_count = 0
|
||
|
|
|
||
|
|
while not stop_event.is_set():
|
||
|
|
try:
|
||
|
|
chunk = audio_queue.get(timeout=0.5)
|
||
|
|
except queue.Empty:
|
||
|
|
continue
|
||
|
|
|
||
|
|
rms = float(np.sqrt(np.mean(chunk ** 2)))
|
||
|
|
|
||
|
|
if rms < SILENCE_THRESH:
|
||
|
|
silent_count += 1
|
||
|
|
else:
|
||
|
|
silent_count = 0
|
||
|
|
buffer.append(chunk)
|
||
|
|
|
||
|
|
flush = buffer and (
|
||
|
|
silent_count >= SILENCE_CHUNKS or
|
||
|
|
len(buffer) >= MAX_BUFFER_CHUNKS
|
||
|
|
)
|
||
|
|
|
||
|
|
if flush:
|
||
|
|
audio_np = np.concatenate(buffer, axis=0).flatten()
|
||
|
|
buffer.clear()
|
||
|
|
silent_count = 0
|
||
|
|
|
||
|
|
# Stream segments as Whisper produces them — fires per phrase
|
||
|
|
segments, _ = model.transcribe(
|
||
|
|
audio_np,
|
||
|
|
language="en",
|
||
|
|
vad_filter=True,
|
||
|
|
vad_parameters={"min_silence_duration_ms": 200},
|
||
|
|
)
|
||
|
|
|
||
|
|
for segment in segments:
|
||
|
|
text = segment.text.strip()
|
||
|
|
if text:
|
||
|
|
print(f"[transcribed] {text}")
|
||
|
|
tts_queue.put(text)
|
||
|
|
|
||
|
|
|
||
|
|
# ── TTS worker (serial so segments don't overlap) ─────────────────────────────
|
||
|
|
|
||
|
|
def tts_worker():
|
||
|
|
loop = asyncio.new_event_loop()
|
||
|
|
asyncio.set_event_loop(loop)
|
||
|
|
while not stop_event.is_set():
|
||
|
|
try:
|
||
|
|
text = tts_queue.get(timeout=0.5)
|
||
|
|
except queue.Empty:
|
||
|
|
continue
|
||
|
|
loop.run_until_complete(speak(text))
|
||
|
|
|
||
|
|
|
||
|
|
async def speak(text: str):
|
||
|
|
import edge_tts
|
||
|
|
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as f:
|
||
|
|
tmp_path = f.name
|
||
|
|
try:
|
||
|
|
communicate = edge_tts.Communicate(text, TTS_VOICE)
|
||
|
|
await communicate.save(tmp_path)
|
||
|
|
# Play to both simultaneously
|
||
|
|
p1 = subprocess.Popen(["paplay", "--device", SINK_NAME, tmp_path])
|
||
|
|
p2 = subprocess.Popen(["paplay", tmp_path]) # default output = your speakers
|
||
|
|
p1.wait()
|
||
|
|
p2.wait()
|
||
|
|
except Exception as e:
|
||
|
|
print(f"[tts] Error: {e}")
|
||
|
|
finally:
|
||
|
|
os.unlink(tmp_path)
|
||
|
|
|
||
|
|
|
||
|
|
# ── Main ──────────────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def main():
|
||
|
|
print("=" * 55)
|
||
|
|
print(" voice_cloak — streaming phrase-by-phrase")
|
||
|
|
print("=" * 55)
|
||
|
|
|
||
|
|
setup_virtual_sink()
|
||
|
|
|
||
|
|
print(f"[whisper] Loading '{WHISPER_MODEL}'...")
|
||
|
|
model = WhisperModel(WHISPER_MODEL, device="cpu", compute_type="int8")
|
||
|
|
print("[whisper] ✓ Ready")
|
||
|
|
|
||
|
|
threads = [
|
||
|
|
threading.Thread(target=capture_thread, daemon=True),
|
||
|
|
threading.Thread(target=tts_worker, daemon=True),
|
||
|
|
]
|
||
|
|
for t in threads:
|
||
|
|
t.start()
|
||
|
|
|
||
|
|
try:
|
||
|
|
transcribe_loop(model)
|
||
|
|
except KeyboardInterrupt:
|
||
|
|
print("\n[main] Stopping...")
|
||
|
|
stop_event.set()
|
||
|
|
for t in threads:
|
||
|
|
t.join(timeout=2)
|
||
|
|
print("[main] Done.")
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
main()
|