Talk to Videos
Developing an interactive AI application for video-based learning in education and business The post Talk to Videos appeared first on Towards Data Science.

gpt-4o
was trained on both text and images and is the first true multimodal LLM that can understand both text and images. Other modalities such as audio are integrated into modern LLMs through other AI models, e.g., OpenAI’s Whisper models.
LLMs are now being used more as information processors where they can process data in different formats. Integrating multiple modalities into LLMs opens areas of numerous applications in education, Business, and other sectors. One such application is the processing of educational videos, documentaries, webinars, presentations, business meetings, lectures, and other content using LLMs and interacting with this content more naturally. The audio modality in these videos contains rich information that could be used in a number of applications. In educational settings, it can be used for personalized learning, enhancing accessibility of students with special needs, study aid creation, remote learning support without requiring a teacher’s presence to understand content, and assessing students’ knowledge about a topic. In business settings, it can be used for training new employees with onboarding videos, extracting and generating knowledge from recording meetings and presentations, customized learning materials from product demonstration videos, and extracting insights from recorded industry conferences without watching hours of videos, to name a few.
This article discusses the development of an application to interact with videos in a natural way and create learning content from them. The application has the following features:
- It takes an input video either through a URL or from a local path and extracts audio from the video
- Transcribes the audio using OpenAI’s state-of-the-art model
gpt-4o-transcribe
, which has demonstrated improved Word Error Rate (WER) performance over existing Whisper models across multiple established benchmarks - Creates a vector store of the transcript and develops a retrieval augment generation (RAG) to establish a conversation with the video transcript
- Respond to users’ questions in text and speech using different voices, selectable from the application’s UI.
- Creates learning content such as:
- Hierarchical representation of the video contents to provide users with quick insights into the main concepts and supporting details
- Generate quizzes to transform passive video watching into active learning by challenging users to recall and apply information presented in the video.
- Generates flashcards from the video content that support active recall and spaced repetition learning techniques
The entire workflow of the application is shown in the following figure.
The whole codebase, along with detailed instructions for installation and usage, is available on GitHub.
Here is the structure of the GitHub repository. The main Streamlit application implements the GUI interface and calls several other functions from other feature and helper modules (.py
files).
In addition, you can visualize the codebase by opening the “codebase visualization” HTML file in a browser, which describes the structures of each module.
Let’s delve into the step-by-step development of this application. I will not discuss the entire code, but only its major part. The whole code in the GitHub repository is adequately commented.
Video Input and Processing
Video input and processing logic are implemented in transcriber.py
. When the application loads, it verifies whether FFMPEG is present (verify_ffmpeg
) in the application’s root directory. FFMPEG is required for downloading a video (if the input is a URL) and extracting audio from the video which is then used to create a transcript.
def verify_ffmpeg():
"""Verify that FFmpeg is available and print its location."""
# Add FFmpeg to PATH
os.environ['PATH'] = FFMPEG_LOCATION + os.pathsep + os.environ['PATH']
# Check if FFmpeg binaries exist
ffmpeg_path = os.path.join(FFMPEG_LOCATION, 'ffmpeg.exe')
ffprobe_path = os.path.join(FFMPEG_LOCATION, 'ffprobe.exe')
if not os.path.exists(ffmpeg_path):
raise FileNotFoundError(f"FFmpeg executable not found at: {ffmpeg_path}")
if not os.path.exists(ffprobe_path):
raise FileNotFoundError(f"FFprobe executable not found at: {ffprobe_path}")
print(f"FFmpeg found at: {ffmpeg_path}")
print(f"FFprobe found at: {ffprobe_path}")
# Try to execute FFmpeg to make sure it works
try:
# Add shell=True for Windows and capture errors properly
result = subprocess.run([ffmpeg_path, '-version'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True, # This can help with permission issues on Windows
check=False)
if result.returncode == 0:
print(f"FFmpeg version: {result.stdout.decode().splitlines()[0]}")
else:
error_msg = result.stderr.decode()
print(f"FFmpeg error: {error_msg}")
# Check for specific permission errors
if "Access is denied" in error_msg:
print("Permission error detected. Trying alternative approach...")
# Try an alternative approach - just check file existence without execution
if os.path.exists(ffmpeg_path) and os.path.exists(ffprobe_path):
print("FFmpeg files exist but execution test failed due to permissions.")
print("WARNING: The app may fail when trying to process videos.")
# Return paths anyway and hope for the best when actually used
return ffmpeg_path, ffprobe_path
raise RuntimeError(f"FFmpeg execution failed: {error_msg}")
except Exception as e:
print(f"Error checking FFmpeg: {e}")
# Fallback option if verification fails but files exist
if os.path.exists(ffmpeg_path) and os.path.exists(ffprobe_path):
print("WARNING: FFmpeg files exist but verification failed.")
print("Attempting to continue anyway, but video processing may fail.")
return ffmpeg_path, ffprobe_path
raise
return ffmpeg_path, ffprobe_path
The video input is in the form of a URL (for instance, YouTube URL) or a local file path. The process_video
function determines the input type and routes it accordingly. If the input is a URL, the helper functions get_video_info
and get_video_id
extract video metadata (title, description, duration) without downloading it using yt_dlp
package.
#Function to determine the input type and route it appropriately
def process_video(youtube_url, output_dir, api_key, model="gpt-4o-transcribe"):
"""
Process a YouTube video to generate a transcript
Wrapper function that combines download and transcription
Args:
youtube_url: URL of the YouTube video
output_dir: Directory to save the output
api_key: OpenAI API key
model: The model to use for transcription (default: gpt-4o-transcribe)
Returns:
dict: Dictionary containing transcript and file paths
"""
# First download the audio
print("Downloading video...")
audio_path = process_video_download(youtube_url, output_dir)
print("Transcribing video...")
# Then transcribe the audio
transcript, transcript_path = process_video_transcribe(audio_path, output_dir, api_key, model=model)
# Return the combined results
return {
'transcript': transcript,
'transcript_path': transcript_path,
'audio_path': audio_path
}
def get_video_info(youtube_url):
"""Get video information without downloading."""
# Check local cache first
global _video_info_cache
if youtube_url in _video_info_cache:
return _video_info_cache[youtube_url]
# Extract info if not cached
with yt_dlp.YoutubeDL() as ydl:
info = ydl.extract_info(youtube_url, download=False)
# Cache the result
_video_info_cache[youtube_url] = info
# Also cache the video ID separately
_video_id_cache[youtube_url] = info.get('id', 'video')
return info
def get_video_id(youtube_url):
"""Get just the video ID without re-extracting if already known."""
global _video_id_cache
if youtube_url in _video_id_cache:
return _video_id_cache[youtube_url]
# If not in cache, extract from URL directly if possible
if "v=" in youtube_url:
video_id = youtube_url.split("v=")[1].split("&")[0]
_video_id_cache[youtube_url] = video_id
return video_id
elif "youtu.be/" in youtube_url:
video_id = youtube_url.split("youtu.be/")[1].split("?")[0]
_video_id_cache[youtube_url] = video_id
return video_id
# If we can't extract directly, fall back to full info extraction
info = get_video_info(youtube_url)
video_id = info.get('id', 'video')
return video_id
After the video input is given, the code in app.py
checks whether a transcript for the input video already exists (in the case of URL input). This is done by calling the following two helper functions from transcriber.py
.
def get_transcript_path(youtube_url, output_dir):
"""Get the expected transcript path for a given YouTube URL."""
# Get video ID with caching
video_id = get_video_id(youtube_url)
# Return expected transcript path
return os.path.join(output_dir, f"{video_id}_transcript.txt")
def transcript_exists(youtube_url, output_dir):
"""Check if a transcript already exists for this video."""
transcript_path = get_transcript_path(youtube_url, output_dir)
return os.path.exists(transcript_path)
If transcript_exists
returns the path of an existing transcript, the next step is to create the vector store for the RAG. If no existing transcript is found, the next step is to download audio from the URL and convert it to a standard audio format. The function process_video_download
downloads audio from the URL using the FFMPEG library and converts it to .mp3
format. If the input is a local video file, app.py
proceeds to convert it to .mp3
file.
def process_video_download(youtube_url, output_dir):
"""
Download audio from a YouTube video
Args:
youtube_url: URL of the YouTube video
output_dir: Directory to save the output
Returns:
str: Path to the downloaded audio file
"""
# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)
# Extract video ID from URL
video_id = None
if "v=" in youtube_url:
video_id = youtube_url.split("v=")[1].split("&")[0]
elif "youtu.be/" in youtube_url:
video_id = youtube_url.split("youtu.be/")[1].split("?")[0]
else:
raise ValueError("Could not extract video ID from URL")
# Set output paths
audio_path = os.path.join(output_dir, f"{video_id}.mp3")
# Configure yt-dlp options
ydl_opts = {
'format': 'bestaudio/best',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
}],
'outtmpl': os.path.join(output_dir, f"{video_id}"),
'quiet': True
}
# Download audio
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([youtube_url])
# Verify audio file exists
if not os.path.exists(audio_path):
# Try with an extension that yt-dlp might have used
potential_paths = [
os.path.join(output_dir, f"{video_id}.mp3"),
os.path.join(output_dir, f"{video_id}.m4a"),
os.path.join(output_dir, f"{video_id}.webm")
]
for path in potential_paths:
if os.path.exists(path):
# Convert to mp3 if it's not already
if not path.endswith('.mp3'):
ffmpeg_path = verify_ffmpeg()[0]
output_mp3 = os.path.join(output_dir, f"{video_id}.mp3")
subprocess.run([
ffmpeg_path, '-i', path, '-c:a', 'libmp3lame',
'-q:a', '2', output_mp3, '-y'
], check=True, capture_output=True)
os.remove(path) # Remove the original file
audio_path = output_mp3
else:
audio_path = path
break
else:
raise FileNotFoundError(f"Could not find downloaded audio file for video {video_id}")
return audio_path
Audio Transcription Using OpenAI’s gpt-4o-transcribe
Model
After extracting audio and converting it to a standard audio format, the next step is to transcribe the audio to text format. For this purpose, I used OpenAI’s newly launched gpt-4o-transcribe
speech-to-text model accessible through speech-to-text API. This model has outperformed OpenAI’s Whisper models in terms of both transcription accuracy and robust language coverage.
The function process_video_transcribe
in transcriber.py
receives the converted audio file and interfaces with gpt-4o-transcribe
model with OpenAI’s speech-to-text API. The gpt-4o-transcribe
model currently has an audio file limit of 25MB and 1500 duration. To overcome this limitation, I split the longer files into multiple chunks and transcribe these chunks separately. The process_video_transcribe
function checks whether the input file exceeds the size and/or duration limit. If either threshold is exceeded, it calls split_and_transcribe
function, which first calculates the number of chunks needed based on both size and duration and takes the maximum of these two as the final number of chunks for transcription. It then finds the start and end times for each chunk and extracts these chunks from the audio file. Subsequently, it transcribes each chunk using gpt-4o-transcribe
model with OpenAI’s speech-to-text API and then combines transcripts of all chunks to generate the final transcript.
def process_video_transcribe(audio_path, output_dir, api_key, progress_callback=None, model="gpt-4o-transcribe"):
"""
Transcribe an audio file using OpenAI API, with automatic chunking for large files
Always uses the selected model, with no fallback
Args:
audio_path: Path to the audio file
output_dir: Directory to save the transcript
api_key: OpenAI API key
progress_callback: Function to call with progress updates (0-100)
model: The model to use for transcription (default: gpt-4o-transcribe)
Returns:
tuple: (transcript text, transcript path)
"""
# Extract video ID from audio path
video_id = os.path.basename(audio_path).split('.')[0]
transcript_path = os.path.join(output_dir, f"{video_id}_transcript.txt")
# Setup OpenAI client
client = OpenAI(api_key=api_key)
# Update progress
if progress_callback:
progress_callback(10)
# Get file size in MB
file_size_mb = os.path.getsize(audio_path) / (1024 * 1024)
# Universal chunking thresholds - apply to both models
max_size_mb = 25 # 25MB chunk size for both models
max_duration_seconds = 1500 # 1500 seconds chunk duration for both models
# Load the audio file to get its duration
try:
audio = AudioSegment.from_file(audio_path)
duration_seconds = len(audio) / 1000 # pydub uses milliseconds
except Exception as e:
print(f"Error loading audio to check duration: {e}")
audio = None
duration_seconds = 0
# Determine if chunking is needed
needs_chunking = False
chunking_reason = []
if file_size_mb > max_size_mb:
needs_chunking = True
chunking_reason.append(f"size ({file_size_mb:.2f}MB exceeds {max_size_mb}MB)")
if duration_seconds > max_duration_seconds:
needs_chunking = True
chunking_reason.append(f"duration ({duration_seconds:.2f}s exceeds {max_duration_seconds}s)")
# Log the decision
if needs_chunking:
reason_str = " and ".join(chunking_reason)
print(f"Audio needs chunking due to {reason_str}. Using {model} for transcription.")
else:
print(f"Audio file is within limits. Using {model} for direct transcription.")
# Check if file needs chunking
if needs_chunking:
if progress_callback:
progress_callback(15)
# Split the audio file into chunks and transcribe each chunk using the selected model only
full_transcript = split_and_transcribe(
audio_path, client, model, progress_callback,
max_size_mb, max_duration_seconds, audio
)
else:
# File is small enough, transcribe directly with the selected model
with open(audio_path, "rb") as audio_file:
if progress_callback:
progress_callback(30)
transcript_response = client.audio.transcriptions.create(
model=model,
file=audio_file
)
if progress_callback:
progress_callback(80)
full_transcript = transcript_response.text
# Save transcript to file
with open(transcript_path, "w", encoding="utf-8") as f:
f.write(full_transcript)
# Update progress
if progress_callback:
progress_callback(100)
return full_transcript, transcript_path
def split_and_transcribe(audio_path, client, model, progress_callback=None,
max_size_mb=25, max_duration_seconds=1500, audio=None):
"""
Split an audio file into chunks and transcribe each chunk
Args:
audio_path: Path to the audio file
client: OpenAI client
model: Model to use for transcription (will not fall back to other models)
progress_callback: Function to call with progress updates
max_size_mb: Maximum file size in MB
max_duration_seconds: Maximum duration in seconds
audio: Pre-loaded AudioSegment (optional)
Returns:
str: Combined transcript from all chunks
"""
# Load the audio file if not provided
if audio is None:
audio = AudioSegment.from_file(audio_path)
# Get audio duration in seconds
duration_seconds = len(audio) / 1000
# Calculate the number of chunks needed based on both size and duration
file_size_mb = os.path.getsize(audio_path) / (1024 * 1024)
chunks_by_size = math.ceil(file_size_mb / (max_size_mb * 0.9)) # Use 90% of max to be safe
chunks_by_duration = math.ceil(duration_seconds / (max_duration_seconds * 0.95)) # Use 95% of max to be safe
num_chunks = max(chunks_by_size, chunks_by_duration)
print(f"Splitting audio into {num_chunks} chunks based on size ({chunks_by_size}) and duration ({chunks_by_duration})")
# Calculate chunk duration in milliseconds
chunk_length_ms = len(audio) // num_chunks
# Create temp directory for chunks if it doesn't exist
temp_dir = os.path.join(os.path.dirname(audio_path), "temp_chunks")
os.makedirs(temp_dir, exist_ok=True)
# Split the audio into chunks and transcribe each chunk
transcripts = []
for i in range(num_chunks):
if progress_callback:
# Update progress: 20% for splitting, 60% for transcribing
progress_percent = 20 + int((i / num_chunks) * 60)
progress_callback(progress_percent)
# Calculate start and end times for this chunk
start_ms = i * chunk_length_ms
end_ms = min((i + 1) * chunk_length_ms, len(audio))
# Extract the chunk
chunk = audio[start_ms:end_ms]
# Save the chunk to a temporary file
chunk_path = os.path.join(temp_dir, f"chunk_{i}.mp3")
chunk.export(chunk_path, format="mp3")
# Log chunk information
chunk_size_mb = os.path.getsize(chunk_path) / (1024 * 1024)
chunk_duration = len(chunk) / 1000
print(f"Chunk {i+1}/{num_chunks}: {chunk_size_mb:.2f}MB, {chunk_duration:.2f}s")
# Transcribe the chunk
try:
with open(chunk_path, "rb") as chunk_file:
transcript_response = client.audio.transcriptions.create(
model=model,
file=chunk_file
)
# Add to our list of transcripts
transcripts.append(transcript_response.text)
except Exception as e:
print(f"Error transcribing chunk {i+1} with {model}: {e}")
# Add a placeholder for the failed chunk
transcripts.append(f"[Transcription failed for segment {i+1}]")
# Clean up the temporary chunk file
os.remove(chunk_path)
# Clean up the temporary directory
try:
os.rmdir(temp_dir)
except:
print(f"Note: Could not remove temporary directory {temp_dir}")
# Combine all transcripts with proper spacing
full_transcript = " ".join(transcripts)
return full_transcript
The following screenshot of the Streamlit app shows the video processing and transcribing workflow for one of my webinars, “Integrating LLMs into Business,” available on my YouTube channel.
Retrieval Augmented Generation (RAG) for Interactive Conversations
After generating the video transcript, the application develops a RAG to facilitate both text and speech-based interactions. The conversational intelligence is implemented through VideoRAG
class in rag_system.py
which initializes chunk size and overlap, OpenAI embeddings, ChatOpenAI
instance to generate responses with gpt-4o
model, and ConversationBufferMemory
to maintain chat history for contextual continuity.
The create_vector_store
method splits the documents into chunks and creates a vector store using the FAISS vector database. The handle_question_submission
method processes text questions and appends each new question and its answer to the conversation history. The handle_speech_input function implements the complete voice-to-text-to-voice pipeline. It first records the question audio, transcribes the question, processes the query through the RAG system, and synthesizes speech for the response.
class VideoRAG:
def __init__(self, api_key=None, chunk_size=1000, chunk_overlap=200):
"""Initialize the RAG system with OpenAI API key."""
# Use provided API key or try to get from environment
self.api_key = api_key if api_key else st.secrets["OPENAI_API_KEY"]
if not self.api_key:
raise ValueError("OpenAI API key is required either as parameter or environment variable")
self.embeddings = OpenAIEmbeddings(openai_api_key=self.api_key)
self.llm = ChatOpenAI(
openai_api_key=self.api_key,
model="gpt-4o",
temperature=0
)
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.vector_store = None
self.chain = None
self.memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True
)
def create_vector_store(self, transcript):
"""Create a vector store from the transcript."""
# Split the text into chunks
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=self.chunk_size,
chunk_overlap=self.chunk_overlap,
separators=["nn", "n", " ", ""]
)
chunks = text_splitter.split_text(transcript)
# Create vector store
self.vector_store = FAISS.from_texts(chunks, self.embeddings)
# Create prompt template for the RAG system
system_template = """You are a specialized AI assistant that answers questions about a specific video.
You have access to snippets from the video transcript, and your role is to provide accurate information ONLY based on these snippets.
Guidelines:
1. Only answer questions based on the information provided in the context from the video transcript, otherwise say that "I don't know. The video doesn't cover that information."
2. The question may ask you to summarize the video or tell what the video is about. In that case, present a summary of the context.
3. Don't make up information or use knowledge from outside the provided context
4. Keep your answers concise and directly related to the question
5. If asked about your capabilities or identity, explain that you're an AI assistant that specializes in answering questions about this specific video
Context from the video transcript:
{context}
Chat History:
{chat_history}
"""
user_template = "{question}"
# Create the messages for the chat prompt
messages = [
SystemMessagePromptTemplate.from_template(system_template),
HumanMessagePromptTemplate.from_template(user_template)
]
# Create the chat prompt
qa_prompt = ChatPromptTemplate.from_messages(messages)
# Initialize the RAG chain with the custom prompt
self.chain = ConversationalRetrievalChain.from_llm(
llm=self.llm,
retriever=self.vector_store.as_retriever(
search_kwargs={"k": 5}
),
memory=self.memory,
combine_docs_chain_kwargs={"prompt": qa_prompt},
verbose=True
)
return len(chunks)
def set_chat_history(self, chat_history):
"""Set chat history from external session state."""
if not self.memory:
return
# Clear existing memory
self.memory.clear()
# Convert standard chat history format to LangChain message format
for message in chat_history:
if message["role"] == "user":
self.memory.chat_memory.add_user_message(message["content"])
elif message["role"] == "assistant":
self.memory.chat_memory.add_ai_message(message["content"])
def ask(self, question, chat_history=None):
"""Ask a question to the RAG system."""
if not self.chain:
raise ValueError("Vector store not initialized. Call create_vector_store first.")
# If chat history is provided, update the memory
if chat_history:
self.set_chat_history(chat_history)
# Get response
response = self.chain.invoke({"question": question})
return response["answer"]
See the following snapshot of the Streamlit app, showing the interactive conversation interface with the video.
The following snapshot shows a conversation with the video with speech input and text+speech output.
Feature Generation
The application generates three features: hierarchical summary, quiz, and flashcards. Please refer to their respective commented codes in the GitHub repo.
The SummaryGenerator
class in summary.py
provides structured content summarization by creating a hierarchical representation of the video content to provide users with quick insights into the main concepts and supporting details. The system retrieves key contextual segments from the transcript using RAG. Using a prompt (see generate_summary
), it creates a hierarchical summary with three levels: main points, sub-points, and additional details. The create_summary_popup_html
method transforms the generated summary into an interactive visual representation using CSS and JavaScript.
# summary.py
class SummaryGenerator:
def __init__(self):
pass
def generate_summary(self, rag_system, api_key, model="gpt-4o", temperature=0.2):
"""
Generate a hierarchical bullet-point summary from the video transcript
Args:
rag_system: The RAG system with vector store
api_key: OpenAI API key
model: Model to use for summary generation
temperature: Creativity level (0.0-1.0)
Returns:
str: Hierarchical bullet-point summary text
"""
if not rag_system:
st.error("Please transcribe the video first before creating a summary!")
return ""
with st.spinner("Generating hierarchical summary..."):
# Create LLM for summary generation
summary_llm = ChatOpenAI(
openai_api_key=api_key,
model=model,
temperature=temperature # Lower temperature for more factual summaries
)
# Use the RAG system to get relevant context
try:
# Get broader context since we're summarizing the whole video
relevant_docs = rag_system.vector_store.similarity_search(
"summarize the main points of this video", k=10
)
context = "nn".join([doc.page_content for doc in relevant_docs])
prompt = """Based on the video transcript, create a hierarchical bullet-point summary of the content.
Structure your summary with exactly these levels:
• Main points (use • or * at the start of the line for these top-level points)
- Sub-points (use - at the start of the line for these second-level details)
* Additional details (use spaces followed by * for third-level points)
For example:
• First main point
- Important detail about the first point
- Another important detail
* A specific example
* Another specific example
• Second main point
- Detail about second point
Be consistent with the exact formatting shown above. Each bullet level must start with the exact character shown (• or *, -, and spaces+*).
Create 3-5 main points with 2-4 sub-points each, and add third-level details where appropriate.
Focus on the most important information from the video.
"""
# Use the LLM with context to generate the summary
messages = [
{"role": "system", "content": f"You are given the following context from a video transcript:nn{context}nnUse this context to create a hierarchical summary according to the instructions."},
{"role": "user", "content": prompt}
]
response = summary_llm.invoke(messages)
return response.content
except Exception as e:
# Fallback to the regular RAG system if there's an error
st.warning(f"Using standard summary generation due to error: {str(e)}")
return rag_system.ask(prompt)
def create_summary_popup_html(self, summary_content):
"""
Create HTML for the summary popup with properly formatted hierarchical bullets
Args:
summary_content: Raw summary text with markdown bullet formatting
Returns:
str: HTML for the popup with properly formatted bullets
"""
# Instead of relying on markdown conversion, let's manually parse and format the bullet points
lines = summary_content.strip().split('n')
formatted_html = []
in_list = False
list_level = 0
for line in lines:
line = line.strip()
# Skip empty lines
if not line:
continue
# Detect if this is a markdown header
if line.startswith('# '):
if in_list:
# Close any open lists
for _ in range(list_level):
formatted_html.append('')
in_list = False
list_level = 0
formatted_html.append(f'{line[2:]}
')
continue
# Check line for bullet point markers
if line.startswith('• ') or line.startswith('* '):
# Top level bullet
content = line[2:].strip()
if not in_list:
# Start a new list
formatted_html.append('')
in_list = True
list_level = 1
elif list_level > 1:
# Close nested lists to get back to top level
for _ in range(list_level - 1):
formatted_html.append('
')
list_level = 1
else:
# Close previous list item if needed
if formatted_html and not formatted_html[-1].endswith('') and in_list:
formatted_html.append('')
formatted_html.append(f'{content}')
elif line.startswith('- '):
# Second level bullet
content = line[2:].strip()
if not in_list:
# Start new lists
formatted_html.append('- Second level items')
formatted_html.append('
')
in_list = True
list_level = 2
elif list_level == 1:
# Add a nested list
formatted_html.append('')
list_level = 2
elif list_level > 2:
# Close deeper nested lists to get to second level
for _ in range(list_level - 2):
formatted_html.append('
')
list_level = 2
else:
# Close previous list item if needed
if formatted_html and not formatted_html[-1].endswith('
') and list_level == 2:
formatted_html.append('')
formatted_html.append(f'{content}')
elif line.startswith(' * ') or line.startswith(' * '):
# Third level bullet
content = line.strip()[2:].strip()
if not in_list:
# Start new lists (all levels)
formatted_html.append('- Top level')
formatted_html.append('
- Second level')
formatted_html.append('
')
in_list = True
list_level = 3
elif list_level == 2:
# Add a nested list
formatted_html.append('')
list_level = 3
elif list_level < 3:
# We missed a level, adjust
formatted_html.append('- Missing level
')
formatted_html.append('')
list_level = 3
else:
# Close previous list item if needed
if formatted_html and not formatted_html[-1].endswith('
') and list_level == 3:
formatted_html.append('
')
formatted_html.append(f'- {content}')
else:
# Regular paragraph
if in_list:
# Close any open lists
for _ in range(list_level):
formatted_html.append('
')
if list_level > 1:
formatted_html.append(' ')
in_list = False
list_level = 0
formatted_html.append(f'{line}')
# Close any open lists
if in_list:
# Close final item
formatted_html.append('')
# Close any open lists
for _ in range(list_level):
if list_level > 1:
formatted_html.append('')
else:
formatted_html.append('')
summary_html = 'n'.join(formatted_html)
html = f"""
"""
return html

Talk-to-Videos app generates quizzes from the video through the QuizGenerator
class in quiz.py
. The quiz generator creates multiple-choice questions targeting specific facts and concepts presented in the video. Unlike RAG, where I use a zero temperature, I increased the LLM temperature to 0.4 to encourage some creativity in quiz generation. A structured prompt guides the quiz generation process. The parse_quiz_response
method extracts and validates the generated quiz elements to make sure that each question has all the required components. To prevent the users from recognizing the pattern and to promote real understanding, the quiz generator shuffles the answer options. Questions are presented one at a time, followed by immediate feedback on each answer. After completing all questions, the calculate_quiz_results
method assesses user answers and the user is presented with an overall score, a visual breakdown of correct versus incorrect answers, and feedback on the performance level. In this way, the quiz generation functionality transforms passive video watching into active learning by challenging users to recall and apply information presented in the video.
# quiz.py
class QuizGenerator:
def __init__(self):
pass
def generate_quiz(self, rag_system, api_key, transcript=None, model="gpt-4o", temperature=0.4):
"""
Generate quiz questions based on the video transcript
Args:
rag_system: The RAG system with vector store2
api_key: OpenAI API key
transcript: The full transcript text (optional)
model: Model to use for question generation
temperature: Creativity level (0.0-1.0)
Returns:
list: List of question objects
"""
if not rag_system:
st.error("Please transcribe the video first before creating a quiz!")
return []
# Create a temporary LLM with slightly higher temperature for more creative questions
creative_llm = ChatOpenAI(
openai_api_key=api_key,
model=model,
temperature=temperature
)
num_questions = 10
# Prompt to generate quiz
prompt = f"""Based on the video transcript, generate {num_questions} multiple-choice questions to test understanding of the content.
For each question:
1. The question should be specific to information mentioned in the video
2. Include 4 options (A, B, C, D)
3. Clearly indicate the correct answer
Format your response exactly as follows for each question:
QUESTION: [question text]
A: [option A]
B: [option B]
C: [option C]
D: [option D]
CORRECT: [letter of correct answer]
Make sure all questions are based on facts from the video."""
try:
if transcript:
# If we have the full transcript, use it
messages = [
{"role": "system", "content": f"You are given the following transcript from a video:nn{transcript}nnUse this transcript to create quiz questions according to the instructions."},
{"role": "user", "content": prompt}
]
response = creative_llm.invoke(messages)
response_text = response.content
else:
# Fallback to RAG approach if no transcript is provided
relevant_docs = rag_system.vector_store.similarity_search(
"what are the main topics covered in this video?", k=5
)
context = "nn".join([doc.page_content for doc in relevant_docs])
# Use the creative LLM with context to generate questions
messages = [
{"role": "system", "content": f"You are given the following context from a video transcript:nn{context}nnUse this context to create quiz questions according to the instructions."},
{"role": "user", "content": prompt}
]
response = creative_llm.invoke(messages)
response_text = response.content
except Exception as e:
# Fallback to the regular RAG system if there's an error
st.warning(f"Using standard question generation due to error: {str(e)}")
response_text = rag_system.ask(prompt)
return self.parse_quiz_response(response_text)
# The rest of the class remains unchanged
def parse_quiz_response(self, response_text):
"""
Parse the LLM response to extract questions, options, and correct answers
Args:
response_text: Raw text response from LLM
Returns:
list: List of parsed question objects
"""
quiz_questions = []
current_question = {}
for line in response_text.strip().split('n'):
line = line.strip()
if line.startswith('QUESTION:'):
if current_question and 'question' in current_question and 'options' in current_question and 'correct' in current_question:
quiz_questions.append(current_question)
current_question = {
'question': line[len('QUESTION:'):].strip(),
'options': [],
'correct': None
}
elif line.startswith(('A:', 'B:', 'C:', 'D:')):
option_letter = line[0]
option_text = line[2:].strip()
current_question.setdefault('options', []).append((option_letter, option_text))
elif line.startswith('CORRECT:'):
current_question['correct'] = line[len('CORRECT:'):].strip()
# Add the last question
if current_question and 'question' in current_question and 'options' in current_question and 'correct' in current_question:
quiz_questions.append(current_question)
# Randomize options for each question
randomized_questions = []
for q in quiz_questions:
# Get the original correct answer
correct_letter = q['correct']
correct_option = None
# Find the correct option text
for letter, text in q['options']:
if letter == correct_letter:
correct_option = text
break
if correct_option is None:
# If we can't find the correct answer, keep the question as is
randomized_questions.append(q)
continue
# Create a list of options texts and shuffle them
option_texts = [text for _, text in q['options']]
# Create a copy of the original letters
option_letters = [letter for letter, _ in q['options']]
# Create a list of (letter, text) pairs
options_pairs = list(zip(option_letters, option_texts))
# Shuffle the pairs
random.shuffle(options_pairs)
# Find the new position of the correct answer
new_correct_letter = None
for letter, text in options_pairs:
if text == correct_option:
new_correct_letter = letter
break
# Create a new question with randomized options
new_q = {
'question': q['question'],
'options': options_pairs,
'correct': new_correct_letter
}
randomized_questions.append(new_q)
return randomized_questions
def calculate_quiz_results(self, questions, user_answers):
"""
Calculate quiz results based on user answers
Args:
questions: List of question objects
user_answers: Dictionary of user answers keyed by question_key
Returns:
tuple: (results dict, correct count)
"""
correct_count = 0
results = {}
for i, question in enumerate(questions):
question_key = f"quiz_q_{i}"
user_answer = user_answers.get(question_key)
correct_answer = question['correct']
# Only count as correct if user selected an answer and it matches
is_correct = user_answer is not None and user_answer == correct_answer
if is_correct:
correct_count += 1
results[question_key] = {
'user_answer': user_answer,
'correct_answer': correct_answer,
'is_correct': is_correct
}
return results, correct_count

Talk-to-Videos also generates flashcards from the video content, which support active recall and spaced repetition learning techniques. This is done through the FlashcardGenerator
class in flashcards.py
, which creates a mix of different flashcards focusing on key term definitions, conceptual questions, fill-in-the-blank statements, and true/False questions with explanations. A prompt guides the LLM to output flashcards in a structured JSON format, with each card containing distinct “front” and “back” elements. The shuffle_flashcards
produces a randomized presentation, and each flashcard is validated to ensure that it contains both front and back components before being presented to the user. The answer to each flashcard is initially hidden. It is revealed at the user’s input using a classic flashcard reveal functionality. Users can generate a new set of flashcards for more practice. The flashcard and quiz systems are interconnected with each other so that users can switch between them as needed.
# flashcards.py
class FlashcardGenerator:
"""Class to generate flashcards from video content using the RAG system."""
def __init__(self):
"""Initialize the flashcard generator."""
pass
def generate_flashcards(self, rag_system, api_key, transcript=None, num_cards=10, model="gpt-4o") -> List[Dict[str, str]]:
"""
Generate flashcards based on the video content.
Args:
rag_system: The initialized RAG system with video content
api_key: OpenAI API key
transcript: The full transcript text (optional)
num_cards: Number of flashcards to generate (default: 10)
model: The OpenAI model to use
Returns:
List of flashcard dictionaries with 'front' and 'back' keys
"""
# Import here to avoid circular imports
from langchain_openai import ChatOpenAI
# Initialize language model
llm = ChatOpenAI(
openai_api_key=api_key,
model=model,
temperature=0.4
)
# Create the prompt for flashcard generation
prompt = f"""
Create {num_cards} educational flashcards based on the video content.
Each flashcard should have:
1. A front side with a question, term, or concept
2. A back side with the answer, definition, or explanation
Focus on the most important and educational content from the video.
Create a mix of different types of flashcards:
- Key term definitions
- Conceptual questions
- Fill-in-the-blank statements
- True/False questions with explanations
Format your response as a JSON array of objects with 'front' and 'back' properties.
Example:
[
{{"front": "What is photosynthesis?", "back": "The process by which plants convert light energy into chemical energy."}},
{{"front": "The three branches of government are: Executive, Legislative, and _____", "back": "Judicial"}}
]
Make sure your output is valid JSON format with exactly {num_cards} flashcards.
"""
try:
# Determine the context to use
if transcript:
# Use the full transcript if provided
# Create messages for the language model
messages = [
{"role": "system", "content": f"You are an educational content creator specializing in creating effective flashcards. Use the following transcript from a video to create educational flashcards:nn{transcript}"},
{"role": "user", "content": prompt}
]
else:
# Fallback to RAG system if no transcript is provided
relevant_docs = rag_system.vector_store.similarity_search(
"key points and educational concepts in the video", k=15
)
context = "nn".join([doc.page_content for doc in relevant_docs])
# Create messages for the language model
messages = [
{"role": "system", "content": f"You are an educational content creator specializing in creating effective flashcards. Use the following context from a video to create educational flashcards:nn{context}"},
{"role": "user", "content": prompt}
]
# Generate flashcards
response = llm.invoke(messages)
content = response.content
# Extract JSON content in case there's text around it
json_start = content.find('[')
json_end = content.rfind(']') + 1
if json_start >= 0 and json_end > json_start:
json_content = content[json_start:json_end]
flashcards = json.loads(json_content)
else:
# Fallback in case of improper JSON formatting
raise ValueError("Failed to extract valid JSON from response")
# Verify we have the expected number of cards (or adjust as needed)
actual_cards = min(len(flashcards), num_cards)
flashcards = flashcards[:actual_cards]
# Validate each flashcard has required fields
validated_cards = []
for card in flashcards:
if 'front' in card and 'back' in card:
validated_cards.append({
'front': card['front'],
'back': card['back']
})
return validated_cards
except Exception as e:
# Handle errors gracefully
print(f"Error generating flashcards: {str(e)}")
# Return a few basic flashcards in case of error
return [
{"front": "Error generating flashcards", "back": f"Please try again. Error: {str(e)}"},
{"front": "Tip", "back": "Try regenerating flashcards or using a different video"}
]
def shuffle_flashcards(self, flashcards: List[Dict[str, str]]) -> List[Dict[str, str]]:
"""Shuffle the order of flashcards"""
shuffled = flashcards.copy()
random.shuffle(shuffled)
return shuffled

Potential Extensions and Improvements
This application can be extended and improved in a number of ways. For instance:
- Integration of visual features in video (such as keyframes) may be explored with audio to extract more meaningful information.
- Team-based learning experiences can be enabled where office colleagues or classmates can share notes, quiz scores, and summaries.
- Creating navigable transcripts that allow users to click on specific sections to jump to that point in the video
- Creating step-by-step action plans for implementing concepts from the video in real business settings
- Modifying the RAG prompt to elaborate on the answers and provide simpler explanations to difficult concepts.
- Generating questions that stimulate metacognitive skills in learners by stimulating them to think about their thinking process and learning strategies while engaging with video content.
That’s all folks! If you liked the article, please follow me on Medium and LinkedIn.
The post Talk to Videos appeared first on Towards Data Science.