Skip to main content

Transcribe Audio with multiple speakers

 

Code

 

🧠 WhisperX Offline Transcription Setup with GUI

📝 Summary

This guide details how to set up and patch WhisperX to transcribe long audio files (MP3, MP4, etc.) offline using a GUI-based Python app, bypassing VAD model downloads and network dependencies.


📦 Project Overview

  • Platform: Python 3.12 with WhisperX + SpeechBrain

  • Goal: Offline GUI app for audio transcription with speaker diarization

  • Input: Audio/Video file

  • Output: Timestamped transcript with speaker labels


✅ Key Features

  • No network requirement for VAD

  • GUI with file selection and logging

  • Long file support (tested on 3hr+ MP3)

  • Speaker diarization using speechbrain

  • Chunk-based transcription (VAD manually bypassed)


🛠️ Setup Instructions

1. 🐍 Python Environment

bash
python -m venv .venv .venv\Scripts\activate pip install whisperx torchaudio pydub tkinter speechbrain

2. 📁 Folder Structure

bash
/transcriber/ ├── transcribe.py # GUI application ├── models/vad/pytorch_model.bin # Downloaded manually ├── .venv/...

3. 🔧 Environment Variable (Set in transcribe.py)

python
os.environ["WHISPERX_VAD_MODEL_PATH"] = r"D:\\PY\\models\\vad\\pytorch_model.bin"

4. 🎯 GUI Usage

Run:

bash
python transcribe.py

Then:

  • Select audio file

  • Choose output and temp folders

  • Click Transcribe


🔧 WhisperX Modifications

vad.py Patch

  • Replaced Hugging Face model download with local load

  • Stubbed merge_chunks() for compatibility

python
def load_vad_model(...): model_fp = os.environ.get("WHISPERX_VAD_MODEL_PATH") if not model_fp or not os.path.exists(model_fp): raise FileNotFoundError("Local VAD model path invalid.") print(f"Using local VAD model at: {model_fp}") bundle = torchaudio.pipelines.HUBERT_BASE return bundle.get_model().to(device).eval() def merge_chunks(chunks, *args, **kwargs): return chunks

asr.py Patch

  • Skipped internal VAD model logic

  • Injected manual chunking (30s per segment)

Modified transcribe() inside FasterWhisperPipeline:

python
duration = audio.shape[0] / SAMPLE_RATE chunk_duration = 30.0 vad_segments = [] start = 0.0 while start < duration: end = min(start + chunk_duration, duration) vad_segments.append({"start": start, "end": end}) start = end

🐛 Issues Resolved

Issue Resolution
TranscriptionOptions.__new__() missing args Manually passed asr_options with required fields
HTTP 301 for VAD model Replaced remote load with offline .bin path
'dict' has no attribute 'ndim' Dummy VAD model returned incompatible type → fully bypassed
vad_segments unexpected argument Removed invalid param from transcribe() call
input shape (1, 80, 782456) too large Manual chunking into 30s segments

📁 Final Notes

  • Long audio files (2–3 hrs) may take 30–60+ minutes depending on CPU speed

  • Recommended: run on GPU or chunk files into 1-hour batches

  • Supports .mp3, .wav, .mp4, .mkv, .m4a


💾 Files to Backup for Future Use

  • transcribe.py

  • Patched: whisperx/vad.py

  • Patched: whisperx/asr.py

  • pytorch_model.bin saved locally


🧩 Future Improvements

  • Optional: add GUI dropdown for model size (base/medium/large)

  • Optional: progress bar and chunk counters

  • Optional: automatic chunked transcription and merge

    import whisperx
    import os
    import tkinter as tk
    from tkinter import filedialog, ttk, scrolledtext
    from pydub import AudioSegment
    import logging
    import subprocess
    import sys
    import shutil
    import warnings
    
    # Suppress deprecation warnings
    warnings.filterwarnings("ignore", category=UserWarning)
    
    # Set up logging
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
    logger = logging.getLogger()
    
    # Force WhisperX to use local VAD model to avoid redirect error
    os.environ["WHISPERX_VAD_MODEL_PATH"] = r"D:\\PY\\models\\vad\\pytorch_model.bin"
    
    # Redirect logging to GUI log window
    class TextHandler(logging.Handler):
        def __init__(self, text_widget):
            super().__init__()
            self.text_widget = text_widget
    
        def emit(self, record):
            try:
                msg = self.format(record)
                if self.text_widget.winfo_exists():  # Check if widget still exists
                    self.text_widget.insert(tk.END, msg + '\n')
                    self.text_widget.see(tk.END)
                    self.text_widget.update()
            except tk.TclError:
                pass  # Ignore errors if GUI is closed
    
    # Supported audio formats
    SUPPORTED_FORMATS = ['.wav', '.m4a', '.mp3', '.mp4', '.mkv']
    
    # Function to convert audio to WAV if not already WAV
    
    def convert_to_wav(input_file, output_dir, temp_dir):
        file_ext = os.path.splitext(input_file)[1].lower()
        if file_ext == '.wav':
            logger.info(f"Input file {input_file} is already WAV. No conversion needed.")
            return input_file
    
        output_file = os.path.join(temp_dir, os.path.splitext(os.path.basename(input_file))[0] + '.wav')
        try:
            format_param = 'matroska' if file_ext == '.mkv' else file_ext[1:]
            audio = AudioSegment.from_file(input_file, format=format_param)
            audio.export(output_file, format='wav')
            logger.info(f"Converted {input_file} to {output_file}")
            return output_file
        except Exception as e:
            logger.error(f"Error converting {input_file} to WAV: {str(e)}")
            try:
                result = subprocess.run(
                    ['ffmpeg', '-i', input_file],
                    capture_output=True, text=True, check=False
                )
                logger.error(f"FFmpeg output: {result.stderr}")
            except Exception as ffmpeg_e:
                logger.error(f"Could not run FFmpeg to diagnose file: {str(ffmpeg_e)}")
            raise
    
    # Main transcription function
    
    def transcribe_audio(input_file, output_dir, temp_dir):
        wav_file = None
        try:
            # Convert to WAV if necessary
            wav_file = convert_to_wav(input_file, output_dir, temp_dir)
    
            # Load the model
            logger.info("Loading WhisperX model...")
            asr_options = {
                "max_new_tokens": 448,
                "clip_timestamps": False,
                "hallucination_silence_threshold": 0.6
            }
            model = whisperx.load_model("base", device="cpu", compute_type="float32", asr_options=asr_options)
    
            # Transcribe
            logger.info("Transcribing audio...")
            result = model.transcribe(
                wav_file,
                batch_size=16,
                language=None,
            )
    
            # Align timestamps
            logger.info("Aligning timestamps...")
            model_a, metadata = whisperx.load_align_model(language_code="en", device="cpu")
            result = whisperx.align(result["segments"], model_a, metadata, wav_file, device="cpu")
    
            # Diarization
            logger.info("Performing diarization...")
            hf_token = "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"  # Replace with your token
            try:
                diarize_model = whisperx.diarize.DiarizationPipeline(use_auth_token=hf_token, device="cpu")
                diarize_segments = diarize_model(wav_file)
                result = whisperx.assign_word_speakers(diarize_segments, result)
            except AttributeError:
                logger.warning("DiarizationPipeline not available in this whisperx version. Skipping diarization.")
                for segment in result["segments"]:
                    segment["speaker"] = "Unknown"
    
            # Save output
            output_file = os.path.join(output_dir, "transcription_with_speakers.txt")
            with open(output_file, "w") as f:
                for segment in result["segments"]:
                    start = segment["start"]
                    end = segment["end"]
                    text = segment["text"]
                    speaker = segment.get("speaker", "Unknown")
                    f.write(f"[{start:.2f}s - {end:.2f}s] Speaker {speaker}: {text}\n")
    
            logger.info(f"Transcription complete. Output saved to {output_file}")
    
        except Exception as e:
            logger.error(f"Error during transcription: {str(e)}")
            raise
        finally:
            if wav_file and wav_file != input_file and os.path.exists(wav_file):
                try:
                    os.remove(wav_file)
                    logger.info(f"Removed temporary WAV file: {wav_file}")
                except Exception as e:
                    logger.warning(f"Could not remove temporary WAV file {wav_file}: {str(e)}")
    
    # GUI Application
    
    class TranscriptionApp:
        def __init__(self, root):
            self.root = root
            self.root.title("Audio Transcription")
            self.root.geometry("600x600")
    
            tk.Label(root, text="Input Audio File:").pack(pady=5)
            self.input_entry = tk.Entry(root, width=50)
            self.input_entry.pack(pady=5)
            tk.Button(root, text="Browse", command=self.browse_input).pack(pady=5)
    
            tk.Label(root, text="Output Directory:").pack(pady=5)
            self.output_entry = tk.Entry(root, width=50)
            self.output_entry.pack(pady=5)
            tk.Button(root, text="Browse", command=self.browse_output).pack(pady=5)
    
            tk.Label(root, text="Temporary Directory (for WAV files):").pack(pady=5)
            self.temp_entry = tk.Entry(root, width=50)
            self.temp_entry.insert(0, "D:\\PY\\temp")
            self.temp_entry.pack(pady=5)
            tk.Button(root, text="Browse", command=self.browse_temp).pack(pady=5)
    
            tk.Button(root, text="Transcribe", command=self.start_transcription).pack(pady=10)
    
            tk.Label(root, text="Log:").pack(pady=5)
            self.log_text = scrolledtext.ScrolledText(root, height=10, width=60, wrap=tk.WORD)
            self.log_text.pack(pady=5)
    
            text_handler = TextHandler(self.log_text)
            text_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
            logger.addHandler(text_handler)
    
        def browse_input(self):
            file_path = filedialog.askopenfilename(filetypes=[("Audio/Video Files", "*.wav *.m4a *.mp3 *.mp4 *.mkv")])
            if file_path:
                self.input_entry.delete(0, tk.END)
                self.input_entry.insert(0, file_path)
    
        def browse_output(self):
            dir_path = filedialog.askdirectory()
            if dir_path:
                self.output_entry.delete(0, tk.END)
                self.output_entry.insert(0, dir_path)
    
        def browse_temp(self):
            dir_path = filedialog.askdirectory()
            if dir_path:
                self.temp_entry.delete(0, tk.END)
                self.temp_entry.insert(0, dir_path)
    
        def start_transcription(self):
            input_file = self.input_entry.get()
            output_dir = self.output_entry.get()
            temp_dir = self.temp_entry.get()
    
            if not input_file or not output_dir or not temp_dir:
                logger.error("Please select input file, output directory, and temporary directory.")
                return
    
            if not os.path.exists(input_file):
                logger.error(f"Input file {input_file} does not exist.")
                return
    
            if os.path.splitext(input_file)[1].lower() not in SUPPORTED_FORMATS:
                logger.error(f"Unsupported file format. Supported formats: {', '.join(SUPPORTED_FORMATS)}")
                return
    
            if not os.path.exists(output_dir):
                try:
                    os.makedirs(output_dir)
                    logger.info(f"Created output directory: {output_dir}")
                except Exception as e:
                    logger.error(f"Could not create output directory {output_dir}: {str(e)}")
                    return
    
            if not os.path.exists(temp_dir):
                try:
                    os.makedirs(temp_dir)
                    logger.info(f"Created temporary directory: {temp_dir}")
                except Exception as e:
                    logger.error(f"Could not create temporary directory {temp_dir}: {str(e)}")
                    return
    
            import threading
            threading.Thread(target=transcribe_audio, args=(input_file, output_dir, temp_dir), daemon=True).start()
    
    # Main execution
    if __name__ == "__main__":
        try:
            subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True)
        except (subprocess.CalledProcessError, FileNotFoundError):
            logger.error("FFmpeg is not installed or not found. Please install FFmpeg to proceed.")
            sys.exit(1)
    
        default_temp_dir = "D:\\PY\\temp"
        if not os.path.exists(default_temp_dir):
            try:
                os.makedirs(default_temp_dir)
            except Exception as e:
                logger.error(f"Could not create default temporary directory {default_temp_dir}: {str(e)}")
                sys.exit(1)
    
        root = tk.Tk()
        app = TranscriptionApp(root)
        root.mainloop()
    
        try:
            if os.path.exists(default_temp_dir):
                shutil.rmtree(default_temp_dir)
                logger.info(f"Cleaned up default temporary directory: {default_temp_dir}")
        except Exception as e:
            logger.warning(f"Could not clean up default temporary directory {default_temp_dir}: {str(e)}")