Build with AI models that can transcribe and understand audio
With a single API call, get access to AI models built on the latest AI breakthroughs to transcribe and understand audio and speech data securely at large scale.
⚠️ WARNING This SDK is intended for testing and light usage only. It is not recommended for use at scale or with production traffic. For best results, we recommend calling the AssemblyAI API directly via HTTP request. See our official documentation for more information, including HTTP code examples.
This repository includes a CLAUDE.md file that provides context to Claude Code about this SDK — key APIs, common patterns, and gotchas. When you open this repo in Claude Code, it automatically reads this file to give better assistance.
Visit our AssemblyAI API Documentation to get an overview of our models!
pip install -U assemblyaiBefore starting, you need to set the API key. If you don't have one yet, sign up for one!
import assemblyai as aai
# set the API key
aai.settings.api_key = f"{ASSEMBLYAI_API_KEY}"Transcribe a local audio file
import assemblyai as aai
aai.settings.base_url = "https://api.assemblyai.com"
aai.settings.api_key = "YOUR_API_KEY"
audio_file = "./example.mp3"
config = aai.TranscriptionConfig(
speech_models=["universal-3-pro", "universal-2"],
language_detection=True,
speaker_labels=True,
)
transcript = aai.Transcriber().transcribe(audio_file, config=config)
if transcript.status == aai.TranscriptStatus.error:
raise RuntimeError(f"Transcription failed: {transcript.error}")
print(f"\nFull Transcript:\n\n{transcript.text}")Transcribe an URL
import assemblyai as aai
aai.settings.base_url = "https://api.assemblyai.com"
aai.settings.api_key = "YOUR_API_KEY"
audio_file = "https://assembly.ai/wildfires.mp3"
config = aai.TranscriptionConfig(
speech_models=["universal-3-pro", "universal-2"],
language_detection=True,
speaker_labels=True,
)
transcript = aai.Transcriber().transcribe(audio_file, config=config)
if transcript.status == aai.TranscriptStatus.error:
raise RuntimeError(f"Transcription failed: {transcript.error}")
print(f"\nFull Transcript:\n\n{transcript.text}")Transcribe binary data
import assemblyai as aai
aai.settings.base_url = "https://api.assemblyai.com"
aai.settings.api_key = "YOUR_API_KEY"
transcriber = aai.Transcriber()
# Binary data is supported directly:
transcript = transcriber.transcribe(data)
# Or: Upload data separately:
upload_url = transcriber.upload_file(data)
transcript = transcriber.transcribe(upload_url)Export subtitles of an audio file
import assemblyai as aai
aai.settings.api_key = "<YOUR_API_KEY>"
# audio_file = "./local_file.mp3"
audio_file = "https://assembly.ai/wildfires.mp3"
config = aai.TranscriptionConfig(
speech_models=["universal-3-pro", "universal-2"],
language_detection=True
)
transcript = aai.Transcriber(config=config).transcribe(audio_file)
if transcript.status == "error":
raise RuntimeError(f"Transcription failed: {transcript.error}")
srt = transcript.export_subtitles_srt(
# Optional: Customize the maximum number of characters per caption
chars_per_caption=32
)
with open(f"transcript_{transcript.id}.srt", "w") as srt_file:
srt_file.write(srt)
# vtt = transcript.export_subtitles_vtt()
# with open(f"transcript_{transcript_id}.vtt", "w") as vtt_file:
# vtt_file.write(vtt)List all sentences and paragraphs
import assemblyai as aai
aai.settings.api_key = "<YOUR_API_KEY>"
# audio_file = "./local_file.mp3"
audio_file = "https://assembly.ai/wildfires.mp3"
config = aai.TranscriptionConfig(
speech_models=["universal-3-pro", "universal-2"],
language_detection=True
)
transcript = aai.Transcriber(config=config).transcribe(audio_file)
if transcript.status == "error":
raise RuntimeError(f"Transcription failed: {transcript.error}")
sentences = transcript.get_sentences()
for sentence in sentences:
print(sentence.text)
print()
paragraphs = transcript.get_paragraphs()
for paragraph in paragraphs:
print(paragraph.text)
print()Search for words in a transcript
import assemblyai as aai
aai.settings.api_key = "<YOUR_API_KEY>"
# audio_file = "./local_file.mp3"
audio_file = "https://assembly.ai/wildfires.mp3"
config = aai.TranscriptionConfig(
speech_models=["universal-3-pro", "universal-2"],
language_detection=True
)
transcript = aai.Transcriber(config=config).transcribe(audio_file)
if transcript.status == "error":
raise RuntimeError(f"Transcription failed: {transcript.error}")
# Set the words you want to search for
words = ["foo", "bar", "foo bar", "42"]
matches = transcript.word_search(words)
for match in matches:
print(f"Found '{match.text}' {match.count} times in the transcript")Add custom spellings on a transcript
import assemblyai as aai
aai.settings.api_key = "<YOUR_API_KEY>"
# audio_file = "./local_file.mp3"
audio_file = "https://assembly.ai/wildfires.mp3"
config = aai.TranscriptionConfig(
speech_models=["universal-3-pro", "universal-2"],
language_detection=True
)
config.set_custom_spelling(
{
"Gettleman": ["gettleman"],
"SQL": ["Sequel"],
}
)
transcript = aai.Transcriber(config=config).transcribe(audio_file)
if transcript.status == "error":
raise RuntimeError(f"Transcription failed: {transcript.error}")
print(transcript.text)Upload a file
import assemblyai as aai
transcriber = aai.Transcriber()
upload_url = transcriber.upload_file(data)Delete a transcript
import assemblyai as aai
aai.settings.api_key = "<YOUR_API_KEY>"
# audio_file = "./local_file.mp3"
audio_file = "https://assembly.ai/wildfires.mp3"
config = aai.TranscriptionConfig(
speech_models=["universal-3-pro", "universal-2"],
language_detection=True
)
transcript = aai.Transcriber(config=config).transcribe(audio_file)
if transcript.status == "error":
raise RuntimeError(f"Transcription failed: {transcript.error}")
print(transcript.text)
transcript.delete_by_id(transcript.id)
transcript = aai.Transcript.get_by_id(transcript.id)
print(transcript.text)List transcripts
This returns a page of transcripts you created.
import assemblyai as aai
transcriber = aai.Transcriber()
page = transcriber.list_transcripts()
print(page.page_details) # Page details
print(page.transcripts) # List of transcriptsYou can apply filter parameters:
params = aai.ListTranscriptParameters(
limit=3,
status=aai.TranscriptStatus.completed,
)
page = transcriber.list_transcripts(params)You can also paginate over all pages by using the helper property before_id_of_prev_url.
The prev_url always points to a page with older transcripts. If you extract the before_id
of the prev_url query parameters, you can paginate over all pages from newest to oldest.
transcriber = aai.Transcriber()
params = aai.ListTranscriptParameters()
page = transcriber.list_transcripts(params)
while page.page_details.before_id_of_prev_url is not None:
params.before_id = page.page_details.before_id_of_prev_url
page = transcriber.list_transcripts(params)PII Redact a transcript
import assemblyai as aai
aai.settings.api_key = "<YOUR_API_KEY>"
# audio_file = "./local_file.mp3"
audio_file = "https://assembly.ai/wildfires.mp3"
config = aai.TranscriptionConfig(
speech_models=["universal-3-pro", "universal-2"],
language_detection=True,
).set_redact_pii(
policies=[
aai.PIIRedactionPolicy.person_name,
aai.PIIRedactionPolicy.organization,
aai.PIIRedactionPolicy.occupation,
],
substitution=aai.PIISubstitutionPolicy.hash,
)
transcript = aai.Transcriber().transcribe(audio_file, config)
print(f"Transcript ID:", transcript.id)
print(transcript.text)To request a copy of the original audio file with the redacted information "beeped" out, set redact_pii_audio=True in the config.
Once the Transcript object is returned, you can access the URL of the redacted audio file with get_redacted_audio_url, or save the redacted audio directly to disk with save_redacted_audio.
import assemblyai as aai
aai.settings.api_key = "<YOUR_API_KEY>"
# audio_file = "./local_file.mp3"
audio_file = "https://assembly.ai/wildfires.mp3"
config = aai.TranscriptionConfig(
speech_models=["universal-3-pro", "universal-2"],
language_detection=True,
).set_redact_pii(
policies=[
aai.PIIRedactionPolicy.person_name,
aai.PIIRedactionPolicy.organization,
aai.PIIRedactionPolicy.occupation,
],
substitution=aai.PIISubstitutionPolicy.hash,
redact_audio=True
)
transcript = aai.Transcriber().transcribe(audio_file, config)
print(f"Transcript ID:", transcript.id)
print(transcript.text)
print(transcript.get_redacted_audio_url())Summarize the content of a transcript over time
import assemblyai as aai
aai.settings.api_key = "<YOUR_API_KEY>"
# audio_file = "./local_file.mp3"
audio_file = "https://assembly.ai/wildfires.mp3"
config = aai.TranscriptionConfig(
speech_models=["universal-3-pro", "universal-2"],
language_detection=True,
auto_chapters=True
)
transcript = aai.Transcriber().transcribe(audio_file, config)
print(f"Transcript ID:", transcript.id)
for chapter in transcript.chapters:
print(f"{chapter.start}-{chapter.end}: {chapter.headline}")Summarize the content of a transcript
import assemblyai as aai
aai.settings.api_key = "<YOUR_API_KEY>"
# audio_file = "./local_file.mp3"
audio_file = "https://assembly.ai/wildfires.mp3"
config = aai.TranscriptionConfig(
speech_models=["universal-3-pro", "universal-2"],
language_detection=True,
summarization=True,
summary_model=aai.SummarizationModel.informative,
summary_type=aai.SummarizationType.bullets
)
transcript = aai.Transcriber().transcribe(audio_file, config)
print(f"Transcript ID: ", transcript.id)
print(transcript.summary)By default, the summarization model will be informative and the summarization type will be bullets. Read more about summarization models and types here.
To change the model and/or type, pass additional parameters to the TranscriptionConfig:
config=aai.TranscriptionConfig(
summarization=True,
summary_model=aai.SummarizationModel.catchy,
summary_type=aai.SummarizationType.headline
)Detect sensitive content in a transcript
import assemblyai as aai
aai.settings.api_key = "<YOUR_API_KEY>"
# audio_file = "./local_file.mp3"
audio_file = "https://assembly.ai/wildfires.mp3"
config = aai.TranscriptionConfig(
speech_models=["universal-3-pro", "universal-2"],
language_detection=True,
content_safety=True
)
transcript = aai.Transcriber().transcribe(audio_file, config)
print(f"Transcript ID:", transcript.id)
for result in transcript.content_safety.results:
print(result.text)
print(f"Timestamp: {result.timestamp.start} - {result.timestamp.end}")
# Get category, confidence, and severity.
for label in result.labels:
print(f"{label.label} - {label.confidence} - {label.severity}") # content safety category
# Get the confidence of the most common labels in relation to the entire audio file.
for label, confidence in transcript.content_safety.summary.items():
print(f"{confidence * 100}% confident that the audio contains {label}")
# Get the overall severity of the most common labels in relation to the entire audio file.
for label, severity_confidence in transcript.content_safety.severity_score_summary.items():
print(f"{severity_confidence.low * 100}% confident that the audio contains low-severity {label}")
print(f"{severity_confidence.medium * 100}% confident that the audio contains medium-severity {label}")
print(f"{severity_confidence.high * 100}% confident that the audio contains high-severity {label}")Read more about the content safety categories.
By default, the content safety model will only include labels with a confidence greater than 0.5 (50%). To change this, pass content_safety_confidence (as an integer percentage between 25 and 100, inclusive) to the TranscriptionConfig:
config=aai.TranscriptionConfig(
content_safety=True,
content_safety_confidence=80, # only include labels with a confidence greater than 80%
)Analyze the sentiment of sentences in a transcript
import assemblyai as aai
aai.settings.api_key = "<YOUR_API_KEY>"
# audio_file = "./local_file.mp3"
audio_file = "https://assembly.ai/wildfires.mp3"
config = aai.TranscriptionConfig(
speech_models=["universal-3-pro", "universal-2"],
language_detection=True,
sentiment_analysis=True
)
transcript = aai.Transcriber().transcribe(audio_file, config)
print(f"Transcript ID:", transcript.id)
for sentiment_result in transcript.sentiment_analysis:
print(sentiment_result.text)
print(sentiment_result.sentiment) # POSITIVE, NEUTRAL, or NEGATIVE
print(sentiment_result.confidence)
print(f"Timestamp: {sentiment_result.start} - {sentiment_result.end}")If speaker_labels is also enabled, then each sentiment analysis result will also include a speaker field.
# ...
config = aai.TranscriptionConfig(sentiment_analysis=True, speaker_labels=True)
# ...
for sentiment_result in transcript.sentiment_analysis:
print(sentiment_result.speaker)Identify entities in a transcript
import assemblyai as aai
aai.settings.api_key = "<YOUR_API_KEY>"
# audio_file = "./local_file.mp3"
audio_file = "https://assembly.ai/wildfires.mp3"
config = aai.TranscriptionConfig(
speech_models=["universal-3-pro", "universal-2"],
language_detection=True,
entity_detection=True
)
transcript = aai.Transcriber().transcribe(audio_file, config)
print(f"Transcript ID:", transcript.id)
for entity in transcript.entities:
print(entity.text)
print(entity.entity_type)
print(f"Timestamp: {entity.start} - {entity.end}\n")Detect topics in a transcript (IAB Classification)
import assemblyai as aai
aai.settings.api_key = "<YOUR_API_KEY>"
# audio_file = "./local_file.mp3"
audio_file = "https://assembly.ai/wildfires.mp3"
config = aai.TranscriptionConfig(
speech_models=["universal-3-pro", "universal-2"],
language_detection=True,
iab_categories=True
)
transcript = aai.Transcriber().transcribe(audio_file, config)
print(f"Transcript ID:", transcript.id)
# Get the parts of the transcript that were tagged with topics
for result in transcript.iab_categories.results:
print(result.text)
print(f"Timestamp: {result.timestamp.start} - {result.timestamp.end}")
for label in result.labels:
print(f"{label.label} ({label.relevance})")
# Get a summary of all topics in the transcript
for topic, relevance in transcript.iab_categories.summary.items():
print(f"Audio is {relevance * 100}% relevant to {topic}")Identify important words and phrases in a transcript
import assemblyai as aai
aai.settings.api_key = "<YOUR_API_KEY>"
# audio_file = "./local_file.mp3"
audio_file = "https://assembly.ai/wildfires.mp3"
config = aai.TranscriptionConfig(
speech_models=["universal-3-pro", "universal-2"],
language_detection=True,
auto_highlights=True
)
transcript = aai.Transcriber().transcribe(audio_file, config)
print(f"Transcript ID:", transcript.id)
for result in transcript.auto_highlights.results:
print(f"Highlight: {result.text}, Count: {result.count}, Rank: {result.rank}, Timestamps: {result.timestamps}")Real-time speech-to-text via WebSocket against the u3-rt-pro model. The SDK ships two clients with identical option/event/handler surfaces — StreamingClient (threaded) and AsyncStreamingClient (asyncio). Pick whichever fits your codebase.
Handler contract: every handler is called as handler(client, event). Plain functions and async def functions both work; AsyncStreamingClient awaits async handlers inline on the read task, so don't block — use asyncio.create_task(...) if you need concurrent work.
Read more about the streaming service.
Stream a local file (sync)
import assemblyai as aai
from assemblyai.streaming.v3 import (
BeginEvent, StreamingClient, StreamingClientOptions, StreamingError,
StreamingEvents, StreamingParameters, TerminationEvent, TurnEvent,
)
def on_begin(client, event: BeginEvent):
print(f"Session started: {event.id}")
def on_turn(client, event: TurnEvent):
print(f"{event.transcript} (end_of_turn={event.end_of_turn})")
def on_terminated(client, event: TerminationEvent):
print(f"Done: {event.audio_duration_seconds}s of audio processed")
def on_error(client, error: StreamingError):
print(f"Error: {error} (code={error.code})")
client = StreamingClient(StreamingClientOptions(api_key="<YOUR_API_KEY>"))
client.on(StreamingEvents.Begin, on_begin)
client.on(StreamingEvents.Turn, on_turn)
client.on(StreamingEvents.Termination, on_terminated)
client.on(StreamingEvents.Error, on_error)
client.connect(StreamingParameters(
sample_rate=16000, speech_model="u3-rt-pro", format_turns=True,
))
try:
client.stream(aai.extras.stream_file(filepath="audio.wav", sample_rate=16000))
finally:
client.disconnect(terminate=True)Stream your microphone (sync)
MicrophoneStream requires PyAudio:
pip install -U "assemblyai[extras]"import assemblyai as aai
from assemblyai.streaming.v3 import (
StreamingClient, StreamingClientOptions, StreamingEvents, StreamingParameters,
)
def on_turn(client, event):
print(f"{event.transcript} (end_of_turn={event.end_of_turn})")
client = StreamingClient(StreamingClientOptions(api_key="<YOUR_API_KEY>"))
client.on(StreamingEvents.Turn, on_turn)
client.connect(StreamingParameters(sample_rate=16000, speech_model="u3-rt-pro"))
try:
client.stream(aai.extras.MicrophoneStream(sample_rate=16000))
finally:
client.disconnect(terminate=True)Stream a local file (async)
AsyncStreamingClient mirrors StreamingClient with async methods. It's safe to use as an async context manager — disconnect() runs on block exit even if user code raises. Don't pass extras.stream_file directly (it uses blocking time.sleep); pace from an async generator instead.
import asyncio
from assemblyai.streaming.v3 import (
AsyncStreamingClient, StreamingClientOptions, StreamingEvents, StreamingParameters,
)
async def stream_file_async(path: str, sample_rate: int, chunk_duration: float = 0.3):
bytes_per_chunk = int(sample_rate * chunk_duration) * 2
with open(path, "rb") as f:
while chunk := f.read(bytes_per_chunk):
yield chunk
await asyncio.sleep(chunk_duration)
async def on_turn(client, event):
print(f"{event.transcript} (end_of_turn={event.end_of_turn})")
async def main():
async with AsyncStreamingClient(StreamingClientOptions(api_key="<YOUR_API_KEY>")) as client:
client.on(StreamingEvents.Turn, on_turn)
await client.connect(StreamingParameters(
sample_rate=16000, speech_model="u3-rt-pro", format_turns=True,
))
await client.stream(stream_file_async("audio.wav", 16000))
asyncio.run(main())Handle errors
Server-side errors arrive on the Error event rather than being raised. The handler receives a StreamingError (an Exception subclass) with .code: int | None — not the wire ErrorEvent class.
StreamingErrorCodes is a dict[int, str] mapping wire codes to human-readable messages. Use .get(...) for lookup:
from assemblyai.streaming.v3 import StreamingErrorCodes
def on_error(client, error):
message = StreamingErrorCodes.get(error.code, str(error))
print(f"Streaming error {error.code}: {message}")Common codes: 4001 Not Authorized, 4002 Insufficient Funds, 4029 Client sent audio too fast, 4031 Session idle for too long.
Change settings mid-session
set_params updates an active session. Typical use: enable turn formatting (punctuation, casing) only on confirmed end-of-turn so partial transcripts stay raw:
from assemblyai.streaming.v3 import StreamingSessionParameters
def on_turn(client, event):
if event.end_of_turn and not event.turn_is_formatted:
client.set_params(StreamingSessionParameters(format_turns=True))For voice agents, force_endpoint() flushes the current turn — useful when an external signal (UI button, barge-in detection) determines the user has stopped speaking before VAD does:
client.force_endpoint() # ends the current turn immediatelyTemporary tokens for browser / edge clients
Don't ship your API key to browsers. Mint a short-lived token server-side and pass it to the client.
Sync server (Flask / WSGI / scripts):
client = StreamingClient(StreamingClientOptions(api_key="<YOUR_API_KEY>"))
token = client.create_temporary_token(expires_in_seconds=60)
# Send `token` to the browser, which connects with options(token=token).Async server (FastAPI / asyncio): always wrap in async with even though you don't call connect() — create_temporary_token lazily opens an httpx.AsyncClient pool. The context manager closes it on exit; without it you leak a pool every request.
from fastapi import FastAPI
from assemblyai.streaming.v3 import AsyncStreamingClient, StreamingClientOptions
app = FastAPI()
MASTER_KEY = "<YOUR_API_KEY>"
@app.get("/streaming-token")
async def streaming_token():
async with AsyncStreamingClient(StreamingClientOptions(api_key=MASTER_KEY)) as client:
return {"token": await client.create_temporary_token(expires_in_seconds=60)}Browser / edge client: pass the token via StreamingClientOptions(token=...):
client = StreamingClient(StreamingClientOptions(token="<TOKEN_FROM_SERVER>"))
client.connect(StreamingParameters(sample_rate=16000, speech_model="u3-rt-pro"))You'll find the Settings class with all default values in types.py.
Change the default timeout and polling interval
import assemblyai as aai
aai.settings.base_url = "https://api.assemblyai.com"
aai.settings.api_key = "YOUR_API_KEY"
# The HTTP timeout in seconds for general requests, default is 30.0
aai.settings.http_timeout = 60.0
# The polling interval in seconds for long-running requests, default is 3.0
aai.settings.polling_interval = 10.0Visit our Playground to try our all of our Speech AI models and LeMUR for free:
When no TranscriptionConfig is being passed to the Transcriber or its methods, it will use a default instance of a TranscriptionConfig.
If you would like to re-use the same TranscriptionConfig for all your transcriptions,
you can set it on the Transcriber directly:
config = aai.TranscriptionConfig(punctuate=False, format_text=False)
transcriber = aai.Transcriber(config=config)
# will use the same config for all `.transcribe*(...)` operations
transcriber.transcribe("https://example.org/audio.wav")You can override the default configuration later via the .config property of the Transcriber:
transcriber = aai.Transcriber()
# override the `Transcriber`'s config with a new config
transcriber.config = aai.TranscriptionConfig(punctuate=False, format_text=False)In case you want to override the Transcriber's configuration for a specific operation with a different one, you can do so via the config parameter of a .transcribe*(...) method:
config = aai.TranscriptionConfig(punctuate=False, format_text=False)
# set a default configuration
transcriber = aai.Transcriber(config=config)
transcriber.transcribe(
"https://example.com/audio.mp3",
# overrides the above configuration on the `Transcriber` with the following
config=aai.TranscriptionConfig(speech_models=["universal-3-pro", "universal-2"], multichannel=True, disfluencies=True)
)Currently, the SDK provides two ways to transcribe audio files.
The synchronous approach halts the application's flow until the transcription has been completed.
The asynchronous approach allows the application to continue running while the transcription is being processed. The caller receives a concurrent.futures.Future object which can be used to check the status of the transcription at a later time.
You can identify those two approaches by the _async suffix in the Transcriber's method name (e.g. transcribe vs transcribe_async).
There are two ways of accessing the HTTP status code:
- All custom AssemblyAI Error classes have a
status_codeattribute. - The latest HTTP response is stored in
aai.Client.get_default().latest_responseafter every API call. This approach works also if no Exception is thrown.
transcriber = aai.Transcriber()
# Option 1: Catch the error
try:
transcript = transcriber.submit("./example.mp3")
except aai.AssemblyAIError as e:
print(e.status_code)
# Option 2: Access the latest response through the client
client = aai.Client.get_default()
try:
transcript = transcriber.submit("./example.mp3")
except:
print(client.last_response)
print(client.last_response.status_code)By default we poll the Transcript's status each 3s. In case you would like to adjust that interval:
import assemblyai as aai
aai.settings.base_url = "https://api.assemblyai.com"
aai.settings.api_key = "YOUR_API_KEY"
aai.settings.polling_interval = 1.0If you previously created a transcript, you can use its ID to retrieve it later.
import assemblyai as aai
aai.settings.base_url = "https://api.assemblyai.com"
aai.settings.api_key = "YOUR_API_KEY"
transcript = aai.Transcript.get_by_id("<TRANSCRIPT_ID>")
print(transcript.id)
print(transcript.text)You can also retrieve multiple existing transcripts and combine them into a single TranscriptGroup object. This allows you to perform operations on the transcript group as a single unit.
import assemblyai as aai
aai.settings.base_url = "https://api.assemblyai.com"
aai.settings.api_key = "YOUR_API_KEY"
transcript_group = aai.TranscriptGroup.get_by_ids(["<TRANSCRIPT_ID_1>", "<TRANSCRIPT_ID_2>"])Both Transcript.get_by_id and TranscriptGroup.get_by_ids have asynchronous counterparts, Transcript.get_by_id_async and TranscriptGroup.get_by_ids_async, respectively. These functions immediately return a Future object, rather than blocking until the transcript(s) are retrieved.
See the above section on Synchronous vs Asynchronous for more information.
