Automated Meeting Minutes Generation using Faster-Whisper, Pyannote, and ChatGPT

Introduction

Creating meeting minutes is a common requirement following most conferences and meetings. This task is often perceived as time-consuming and tedious, especially considering that the resulting documents are rarely read thoroughly. To address this issue, I have developed a Python script that generates meeting minutes from audio or video files. This article introduces this innovative solution.

The script leverages three powerful AI models:

  1. Whisper (for speech recognition)

  2. Pyannote (for speaker identification)

  3. ChatGPT (for text generation)

I previously wrote an article about using Whisper, which demonstrated considerable processing time. In response to this limitation, this article introduces a script utilizing Faster-Whisper, a more efficient alternative.

The speed improvement is notable and highly beneficial for our purposes.

Script Overview

The script operates through the following steps:

  1. Loading and preprocessing the audio/video file

  2. Transcribing the audio using Faster-Whisper

  3. Identifying speakers with Pyannote

  4. Integrating transcription and speaker information

  5. Generating meeting minutes using ChatGPT

Let's examine each component in detail, but first, we'll cover the environment setup. I run this script on Ubuntu 24.04 on WSL, using an NVIDIA RTX 3070Ti GPU.

Environment Setup

Obtaining an OpenAI API Key

Create an account on the OpenAI website and acquire an API key. For detailed instructions, please refer to this article.

Acquiring a Hugging Face Access Token

For guidance on obtaining a Hugging Face access token, please consult this informative article.

https://zenn.dev/protoout/articles/73-hugging-face-setup

Docker Engine Installation

https://docs.docker.com/engine/install/ubuntu/

Remove potentially old Docker installations:

for pkg in docker.io docker-doc docker-compose docker-compose-v2 podman-docker containerd runc; do sudo apt remove $pkg; done

Install Docker Engine:

sudo apt update
sudo apt install ca-certificates curl
sudo install -m 0755 -d /etc/apt/keyrings
sudo curl -fsSL https://download.docker.com/linux/ubuntu/gpg -o /etc/apt/keyrings/docker.asc
sudo chmod a+r /etc/apt/keyrings/docker.asc
echo \
  "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] https://download.docker.com/linux/ubuntu \
  $(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \
  sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
sudo apt update
sudo apt install docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin
sudo docker run hello-world
docker -v
wsl -—shutdown

Configure Docker to run without sudo:

sudo groupadd docker
sudo usermod -aG docker rui
wsl --shutdown

Set Docker to start automatically:

To avoid manually starting Docker after each reboot, add the following to your sudoers file:

sudo visudo

Add this line at the end:

docker ALL=(ALL)  NOPASSWD: /usr/sbin/service docker start
sudo nano ~/.bashrc

Add this line at the end:

if [[ $(service docker status | awk '{print $4}') = "not" ]]; then
sudo service docker start > /dev/null
fi
source ~/.bashrc

NVIDIA Docker Installation

curl -fsSL https://nvidia.github.io/libnvidia-container/gpgkey | sudo gpg --dearmor -o /usr/share/keyrings/nvidia-container-toolkit-keyring.gpg \
&& curl -s -L https://nvidia.github.io/libnvidia-container/stable/deb/nvidia-container-toolkit.list | \
sed 's#deb https://#deb [signed-by=/usr/share/keyrings/nvidia-container-toolkit-keyring.gpg] https://#g' | \
sudo tee /etc/apt/sources.list.d/nvidia-container-toolkit.list
sudo apt update
sudo apt install -y nvidia-container-toolkit
sudo nvidia-ctk runtime configure --runtime=docker
sudo systemctl restart docker

Creating a Docker Container

In your Ubuntu on WSL home folder:
CUDA | NVIDIA NGC

docker pull nvcr.io/nvidia/cuda:12.2.2-cudnn8-devel-ubuntu22.04
docker run -it --gpus all nvcr.io/nvidia/cuda:12.2.2-cudnn8-devel-ubuntu22.04
apt update && apt full-upgrade -y
apt install git wget nano ffmpeg -y

Miniconda Installation

cd ~
mkdir tmp
cd tmp

https://docs.anaconda.com/miniconda/#miniconda-latest-installer-links

wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
bash Miniconda3-latest-Linux-x86_64.sh

# yes, enter, yes
# remove a tmp folder
cd ..
rm -rf tmp

exit

docker container ls -a

docker start <container id>

docker exec -it <container id> /bin/bash

Conda Environment Setup

mkdir faster_whisper
cd faster_whisper
nano environment_faster_whisper.yml
name: faster-whisper
channels:
  - conda-forge
  - pytorch
  - nvidia
  - defaults
dependencies:
  - python=3.11
  - pip
  - nvidia/label/cuda-12.2.2::cuda-toolkit
  - tiktoken
  - pip:
    - faster-whisper
    - pyannote.audio
    - pydub==0.25.1
    - torch
    - torchvision
    - torchaudio
    - moviepy
    - openai
    - python-dotenv
conda env create -f environment_faster_whisper.yml
conda activate faster-whisper

Script

nano faster_whisper_script.py
import os
import subprocess
from faster_whisper import WhisperModel
from pyannote.audio import Pipeline
from pydub import AudioSegment
from moviepy.editor import VideoFileClip
import torch
from openai import OpenAI
from dotenv import load_dotenv
import argparse
import tiktoken

def get_env_variable(var_name):
    value = os.getenv(var_name)
    if value is None:
        raise ValueError(f"Environment variable {var_name} is not set")
    return value

# Load the .env file
load_dotenv()

# Retrieve OpenAI API key and HuggingFace token from environment variables
OPENAI_API_KEY = get_env_variable("OPENAI_API_KEY")
HF_TOKEN = get_env_variable("HF_TOKEN")

def get_device():
    return 'cuda' if torch.cuda.is_available() else 'cpu'

def convert_to_wav(input_file):
    file_ext = os.path.splitext(input_file)[1].lower()
    if file_ext in [".mp3", ".mp4", ".wma"]:
        print(f"Converting {input_file} to WAV")
        wav_file_path = input_file.replace(file_ext, ".wav")
        
        if file_ext == ".wma":
            try:
                subprocess.run(['ffmpeg', '-i', input_file, wav_file_path], check=True)
                print(f"Converted file saved as {wav_file_path}")
            except subprocess.CalledProcessError:
                raise ValueError(f"Error converting WMA file: {input_file}")
        else:
            audio = AudioSegment.from_file(input_file, format=file_ext[1:])
            audio.export(wav_file_path, format="wav")
            print(f"Converted file saved as {wav_file_path}")
        
        return wav_file_path
    elif file_ext == ".wav":
        return input_file
    else:
        raise ValueError(f"Unsupported file format: {file_ext}")

def extract_audio_from_video(video_file):
    audio_file = video_file.replace(".mp4", ".wav")
    print(f"Extracting audio from {video_file}")
    try:
        video = VideoFileClip(video_file)
        audio = video.audio
        audio.write_audiofile(audio_file)
        video.close()
        audio.close()
        print(f"Audio extracted and saved as {audio_file}")
    except Exception as e:
        print(f"An error occurred while extracting audio: {str(e)}")
        raise
    return audio_file

def transcribe_audio(file_path, model_name, beam_size=5, compute_type="float16"):
    print(f"Transcribing audio from {file_path} using model {model_name}")
    print(f"Beam size: {beam_size}, Compute type: {compute_type}")
    device = get_device()
    model = WhisperModel(model_name, device=device, compute_type=compute_type)
    
    try:
        # First, attempt multi-segment language detection
        language_info = model.detect_language_multi_segment(file_path)
        detected_language = language_info[0]
    except AttributeError:
        # If multi-segment method is not available, use standard language detection
        print("Multi-segment language detection not available. Using standard detection.")
        segments, info = model.transcribe(file_path, beam_size=beam_size)
        detected_language = info.language
    
    print(f"Detected language: {detected_language}")
    
    segments, info = model.transcribe(file_path, beam_size=beam_size, language=detected_language)
    
    transcription = ""
    formatted_segments = []
    
    for segment in segments:
        transcription += segment.text + " "
        formatted_segments.append({
            "start": segment.start,
            "end": segment.end,
            "text": segment.text
        })
    
    print(f"Transcription completed")
    return transcription.strip(), formatted_segments

def diarize_audio(file_path, num_speakers=None, min_speakers=None, max_speakers=None):
    print(f"Performing speaker diarization on {file_path}")
    device = get_device()
    print(f"Using device: {device}")
    
    try:
        pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=HF_TOKEN)
        pipeline.to(device)
    except Exception as e:
        print(f"Failed to load the pipeline: {e}")
        return None
    
    if num_speakers is not None:
        diarization = pipeline(file_path, num_speakers=num_speakers)
    elif min_speakers is not None and max_speakers is not None:
        diarization = pipeline(file_path, min_speakers=min_speakers, max_speakers=max_speakers)
    else:
        diarization = pipeline(file_path)

    print(f"Speaker diarization completed: {diarization}")
    return diarization

def assign_speakers_to_segments(segments, diarization):
    if diarization is None:
        print("Diarization is None. Skipping speaker assignment.")
        return []

    print("Assigning speakers to segments")
    speaker_segments = []
    current_speaker = None
    current_segment = None
    unmatched_segments = []

    for segment in segments:
        start_time = segment['start']
        end_time = segment['end']
        text = segment['text']
        matched = False

        for turn, _, speaker in diarization.itertracks(yield_label=True):
            overlap = min(end_time, turn.end) - max(start_time, turn.start)
            if overlap > 0:  # Consider it a match if there's an overlap
                if current_speaker == speaker:
                    # Append to the current segment
                    current_segment['end'] = end_time
                    current_segment['text'] += " " + text
                else:
                    # Save the current segment and start a new one
                    if current_segment:
                        speaker_segments.append(current_segment)
                    current_speaker = speaker
                    current_segment = {
                        "start": start_time,
                        "end": end_time,
                        "speaker": speaker,
                        "text": text
                    }
                matched = True
                break
        
        if not matched:
            print(f"No matching speaker found for segment: {text} [{start_time} - {end_time}]")
            unmatched_segments.append({
                "start": start_time,
                "end": end_time,
                "speaker": "UNKNOWN",
                "text": text
            })

    if current_segment:
        speaker_segments.append(current_segment)

    # Merge unmatched segments into speaker_segments
    all_segments = speaker_segments + unmatched_segments
    all_segments.sort(key=lambda x: x['start'])  # Sort by start time

    print("Speakers assigned to segments")
    return all_segments

def count_tokens(text):
    encoding = tiktoken.encoding_for_model("gpt-4-turbo")
    return len(encoding.encode(text))

def split_transcript(transcript, max_tokens_per_chunk):
    words = transcript.split()
    chunks = []
    current_chunk = []
    current_chunk_tokens = 0

    for word in words:
        word_tokens = count_tokens(word)
        if current_chunk_tokens + word_tokens > max_tokens_per_chunk:
            chunks.append(" ".join(current_chunk))
            current_chunk = [word]
            current_chunk_tokens = word_tokens
        else:
            current_chunk.append(word)
            current_chunk_tokens += word_tokens

    if current_chunk:
        chunks.append(" ".join(current_chunk))

    return chunks

def generate_minutes(speaker_segments):
    print("Generating meeting minutes using OpenAI API")
    client = OpenAI(api_key=OPENAI_API_KEY)

    transcript = "\n".join([f"Speaker {seg['speaker']} [{seg['start']} - {seg['end']}]: {seg['text']}" for seg in speaker_segments])
    max_tokens_per_chunk = 4096  # Adjust this based on your token limits and model used

    chunks = split_transcript(transcript, max_tokens_per_chunk)
    all_minutes = []

    for chunk in chunks:
        messages = [
            {"role": "system", "content": "Please create meeting minutes for the following conversation."},
            {"role": "user", "content": chunk}
        ]

        try:
            response = client.chat.completions.create(
                messages=messages,
                model="gpt-4",
                max_tokens=4096,  # Adjust the max tokens as necessary
                n=1,
                stop=None,
                temperature=0.5,
            )

            if response.choices and len(response.choices) > 0:
                minutes = response.choices[0].message.content.strip()
                all_minutes.append(minutes)
            else:
                print("No choices in the response")
                return None

        except Exception as e:
            print(f"An error occurred: {e}")
            return None

    return "\n".join(all_minutes)

def parse_arguments():
    parser = argparse.ArgumentParser(description="Speech to Text with Speaker Diarization")
    parser.add_argument("audio_file", help="Path to the audio or video file")
    parser.add_argument("--model", choices=["tiny", "base", "small", "medium", "large", "large-v2", "large-v3"], 
                        default="base", help="Faster-whisper model to use")
    parser.add_argument("--num_speakers", type=int, help="Number of speakers (if known)")
    parser.add_argument("--min_speakers", type=int, help="Minimum number of speakers")
    parser.add_argument("--max_speakers", type=int, help="Maximum number of speakers")
    parser.add_argument("--beam_size", type=int, default=5, help="Beam size for transcription")
    parser.add_argument("--compute_type", choices=["float16", "float32", "int8"], 
                        default="float16", help="Compute type for the model")
    return parser.parse_args()

def main(args):
    print(f"Processing file: {args.audio_file}")
    file_ext = os.path.splitext(args.audio_file)[1].lower()
    if file_ext == ".mp4":
        audio_file = extract_audio_from_video(args.audio_file)
    else:
        audio_file = args.audio_file
    
    if file_ext in [".mp3", ".mp4", ".wav"]:
        audio_file = convert_to_wav(audio_file)
    
    transcription, segments = transcribe_audio(audio_file, args.model, args.beam_size, args.compute_type)
    print(f"Transcription: {transcription}")
    diarization = diarize_audio(audio_file, num_speakers=args.num_speakers, 
                                min_speakers=args.min_speakers, max_speakers=args.max_speakers)
    speaker_segments = assign_speakers_to_segments(segments, diarization)

    output_file = os.path.splitext(audio_file)[0] + "_output.txt"
    with open(output_file, 'w', encoding='utf-8') as f:
        f.write("Speakers assigned to segments\n")
        for segment in speaker_segments:
            segment_text = f"Speaker {segment['speaker']} [{segment['start']} - {segment['end']}]: {segment['text']}"
            print(segment_text)
            f.write(segment_text + "\n")
    
    print(f"Results written to {output_file}")

    minutes = generate_minutes(speaker_segments)
    if minutes is not None:
        minutes_file = os.path.splitext(audio_file)[0] + "_minutes.txt"
        with open(minutes_file, 'w', encoding='utf-8') as f:
            f.write(minutes)
        print(f"Meeting minutes written to {minutes_file}")
    else:
        print("Failed to generate meeting minutes.")

if __name__ == "__main__":
    args = parse_arguments()
    main(args)

Create a .env file with your API keys:

nano .env
OPENAI_API_KEY=sk-XXXXXXXXXXXXXXXXXXXXXX
HF_TOKEN=hf_ZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ

Usage Instructions

Help message

python faster_whisper_script.py --help
usage: faster_whisper_script.py [-h] [--model {tiny,base,small,medium,large,large-v2,large-v3}] [--num_speakers NUM_SPEAKERS] [--min_speakers MIN_SPEAKERS] [--max_speakers MAX_SPEAKERS]
                                [--beam_size BEAM_SIZE] [--compute_type {float16,float32,int8}]
                                audio_file

Speech to Text with Speaker Diarization

positional arguments:
  audio_file            Path to the audio or video file

options:
  -h, --help            show this help message and exit
  --model {tiny,base,small,medium,large,large-v2,large-v3}
                        Faster-whisper model to use
  --num_speakers NUM_SPEAKERS
                        Number of speakers (if known)
  --min_speakers MIN_SPEAKERS
                        Minimum number of speakers
  --max_speakers MAX_SPEAKERS
                        Maximum number of speakers
  --beam_size BEAM_SIZE
                        Beam size for transcription
  --compute_type {float16,float32,int8}
                        Compute type for the model

Usage Instructions

# on ubuntu command line
# docker container ls -a
docker cp "/mnt/c/Windows_path/VVVV.wav" docker_container_name:root/speech_to_text

Example usage

python faster_whisper_script.py XXXX.wav --model large-v3 --min_speakers 1 --max_speakers 5

To copy files from the Docker container to Windows:

docker cp docker_container_name:root/speech_to_text/xxx.txt "/mnt/c/Windows path/"

Conclusion

This Python script combines cutting-edge AI technologies to automatically generate high-quality meeting minutes from audio or video inputs. It significantly streamlines the process of recording meetings and transcribing lectures.

The implementation of Faster-Whisper has notably reduced processing time, enhancing the efficiency of the entire workflow.

I hope this tool allows you to allocate more time to your educational and research activities, leading to increased productivity and satisfaction in your academic endeavors.

The rapid advancement of AI technology is truly remarkable. I encourage you to explore and utilize such AI technologies to create more efficient work environments tailored to your needs.

Your feedback, questions, and suggestions for improvement are highly valued. Please feel free to leave a comment below.

いいなと思ったら応援しよう!