This blog post covers a full pipeline to process audio content in any language and produce subtitles in a target language with minimal code adjustments and uses Japanese to Chinese as an example. All the codes are written in Python and generated by ChatGPT o1. (Actually this blog post is generated by ChatGPT o1 and 4o as well.)

  1. Extract/Transcribe Audio: Using OpenAI’s Whisper API to convert audio into text (with timestamps).
  2. Generate an SRT: Convert those timestamped segments into an SRT file.
  3. Translate the SRT from Japanese to Chinese: Use GPT-4 for chunk-based and context-aware translations.
  4. Embed subtitles into video: Use ffmpeg to embed subtitles into a video.

We’ll also show you how to verify translations for completeness, ensuring a high-quality final subtitle track.

Prerequisites

  • Python 3.9+
  • Pip-installed packages:
    • openai>=1.0.0
    • pydub
    • tqdm
    • python-srt
  • An OpenAI API key, which can be obtained by signing up at OpenAI’s platform, and then setting it in your environment:
    export OPENAI_API_KEY="..."
    

1. Transcribing Audio With transcribe_audio.py

Often, your audio file might be too large (>25 MB) for the OpenAI Whisper API. So we split it into smaller chunks (e.g. 10 minutes each) and call openai.Audio.transcribe on each chunk.

Below is an example script: transcribe_audio.py.

#!/usr/bin/env python3

import argparse
import os
import json
from pydub import AudioSegment
import openai
from tqdm import tqdm

def split_audio(input_file: str, chunk_length_minutes: float) -> list[str]:
    """
    Splits the input audio file into multiple chunks of `chunk_length_minutes`.
    Saves each chunk as a separate .mp3 file in the same directory as the input.
    Returns a list of chunk file paths.
    """
    audio = AudioSegment.from_file(input_file)
    chunk_length_ms = int(chunk_length_minutes * 60 * 1000)

    base_name = os.path.splitext(os.path.basename(input_file))[0]
    directory = os.path.dirname(os.path.abspath(input_file))
    
    chunks = []
    start = 0
    end = len(audio)
    chunk_count = 0

    while start < end:
        chunk_count += 1
        chunk_audio = audio[start:start + chunk_length_ms]
        chunk_path = os.path.join(directory, f"{base_name}_chunk_{chunk_count}.mp3")
        chunk_audio.export(chunk_path, format="mp3")
        chunks.append(chunk_path)
        start += chunk_length_ms

    return chunks

def transcribe_file(file_path: str, model: str = "whisper-1", language: str = "ja"):
    """
    Transcribes a single audio file using OpenAI Whisper API (up to 25 MB).
    Returns the verbose_json response as a Python dict.
    """
    if not openai.api_key:
        raise ValueError("No OPENAI_API_KEY found in environment variables.")

    with open(file_path, "rb") as audio_file:
        response = openai.Audio.transcribe(
            file=audio_file,
            model=model,
            language=language,
            response_format="verbose_json",
            timestamp_granularity="segment"
        )
    return response

def main():
    parser = argparse.ArgumentParser(
        description="Split a large audio file into smaller chunks, then transcribe each chunk with OpenAI Whisper."
    )
    parser.add_argument("--input", required=True, help="Path to the input audio file (e.g., .mp3, .m4a, .wav).")
    parser.add_argument("--chunk_minutes", type=float, default=10.0,
                        help="Length (in minutes) of each chunk. Default: 10 minutes.")
    parser.add_argument("--model", default="whisper-1", help="OpenAI Whisper model to use (default: whisper-1).")
    parser.add_argument("--language", default="ja", help="Language in ISO-639-1 code (e.g., 'ja' for Japanese).")
    parser.add_argument("--output_json", default="transcript.json",
                        help="Path to save the combined transcription JSON.")
    args = parser.parse_args()

    openai.api_key = os.environ.get("OPENAI_API_KEY")
    if not openai.api_key:
        raise ValueError("No OPENAI_API_KEY found in environment variables.")

    # 1) Split the audio
    print(f"Splitting '{args.input}' into chunks of {args.chunk_minutes} minute(s)...")
    chunk_paths = split_audio(args.input, args.chunk_minutes)
    print(f"Created {len(chunk_paths)} chunk file(s).")

    # 2) Transcribe each chunk
    combined_results = []
    for chunk_path in tqdm(chunk_paths, desc="Transcribing chunks"):
        try:
            result = transcribe_file(chunk_path, model=args.model, language=args.language)
            combined_results.append({
                "chunk_file": chunk_path,
                "transcription": result
            })
        except openai.error.OpenAIError as e:
            print(f"[ERROR] Failed to transcribe {chunk_path}: {e}")

    # 3) Save the combined results to JSON
    with open(args.output_json, "w", encoding="utf-8") as f:
        json.dump(combined_results, f, ensure_ascii=False, indent=2)
    print(f"\nDone! Combined transcription saved to {args.output_json}")

if __name__ == "__main__":
    main()

2. Generating SRT With generate_srt.py

Once you have the transcript JSON from the previous step, you typically have one or more chunks, each containing a transcription object. To form a single SRT, we gather all segments, apply any offset, and then write them in chronological order.

#!/usr/bin/env python3

import srt
from datetime import timedelta
import json
import argparse
import os

def segments_to_srt(segments, out_path="output_subtitles.srt"):
    """
    segments = [
      {"start": 1.23, "end": 5.67, "text": "some line"},
      {"start": 5.67, "end": 10.00, "text": "next line"},
      ...
    ]
    Writes them to an SRT file at out_path.
    """
    subtitles = []
    for i, seg in enumerate(segments, start=1):
        start_td = timedelta(seconds=seg["start"])
        end_td = timedelta(seconds=seg["end"])
        subtitles.append(
            srt.Subtitle(index=i, start=start_td, end=end_td, content=seg["text"])
        )

    srt_text = srt.compose(subtitles)
    with open(out_path, "w", encoding="utf-8") as f:
        f.write(srt_text)
    print(f"SRT saved to {out_path}")

def load_transcript_json(json_path):
    """
    Loads the JSON file from transcribe_audio.py.
    Returns a merged list of segments.
    """
    with open(json_path, "r", encoding="utf-8") as f:
        chunked_data = json.load(f)

    final_segments = []
    for chunk_info in chunked_data:
        trans_segments = chunk_info["transcription"]["segments"]
        final_segments.extend(trans_segments)

    final_segments.sort(key=lambda s: s["start"])
    return final_segments

def main():
    parser = argparse.ArgumentParser(
        description="Generate a single SRT from the JSON output of chunked Whisper transcriptions."
    )
    parser.add_argument("--input_json", required=True, help="Path to the combined transcript JSON")
    parser.add_argument("--output_srt", default="output_subtitles.srt", help="Path for the generated SRT")
    args = parser.parse_args()

    # Load & merge segments
    segments = load_transcript_json(args.input_json)

    # Convert segments to a single SRT
    segments_to_srt(segments, out_path=args.output_srt)

if __name__ == "__main__":
    main()

3. Translating Japanese SRT to Chinese With translate_srt_jp_to_cn.py

Finally, you can run a chunk-based translation on the generated SRT. This process involves dividing the subtitle text into manageable segments and translating them efficiently using GPT-4. Below is a detailed script that demonstrates:

  • Async Summarization: For context-aware translation, the script first summarizes the entire subtitle text.
  • Chunk-based Translation: Subtitles are processed in smaller, manageable blocks, leveraging GPT-4’s capacity for parallel translation.
  • User Interaction: Users can review and edit the generated summaries for accuracy and contextual relevance.
  • Bilingual Output: The final result includes both the original and translated text for reference, ensuring a comprehensive subtitle file.
#!/usr/bin/env python3

import asyncio
import openai
import argparse
import os
from tqdm import tqdm

###############################################################################
#                              ASYNC SUMMARIZATION
###############################################################################

async def async_summarize_srt(srt_text: str) -> str:
    """
    Asynchronously summarizes the given SRT text using GPT-4o-2024-11-20.
    Return the summarized text as a string.
    """
    response = openai.chat.completions.create(
        model="gpt-4o-2024-11-20",
        messages=[
            {
                "role": "system",
                "content": (
                    "You are a helpful assistant that summarizes a movie or show's subtitle text "
                    "into a concise description of the overall context, plot, and key topics."
                ),
            },
            {
                "role": "user",
                "content": (
                    "Please summarize the following subtitle text. Include essential plot points, "
                    "characters, and overall style or tone. Keep it concise:\n\n"
                    f"{srt_text}"
                ),
            },
        ],
        temperature=0.5,
    )
    return response.choices[0].message.content.strip()


async def async_summarize_two_parts(srt_full_text: str) -> str:
    """
    Split the SRT text into two halves, do a two-step summarization:
      1) Summarize Part 1 -> summary1
      2) Combine (summary1 + Part 2) -> final_summary
    """
    # Naive split by character length
    half = len(srt_full_text) // 2
    part1 = srt_full_text[:half]
    part2 = srt_full_text[half:]

    # 1) Summarize first half
    print("Summarizing Part 1 (first half of text)...")
    summary1 = await async_summarize_srt(part1)
    print("--- Summary of Part 1 ---")
    print(summary1)
    print("-------------------------\n")

    # 2) Summarize (summary1 + Part 2)
    print("Summarizing (Summary1 + Part 2) to get the final summary...")
    combined_text = summary1 + "\n\n" + part2
    final_summary = await async_summarize_srt(combined_text)
    return final_summary


###############################################################################
#                           ASYNC TRANSLATION UTILS
###############################################################################

async def async_translate_batch(
    block_texts: list[str], 
    summary_context: str
) -> list[str]:
    """
    Asynchronously translates a batch of Japanese subtitle blocks into Chinese 
    using GPT-4o-2024-11-20, with summary_context as background knowledge.

    Returns a list of translations, one for each text in block_texts.
    """
    delimiter = "|||"
    joined_text = f"\n{delimiter}\n".join(block_texts)

    user_prompt = (
        "You have the following background/context for the entire show:\n\n"
        f"{summary_context}\n\n"
        "Now, translate the following Japanese subtitle blocks into Chinese. "
        f"Each block is separated by the delimiter '{delimiter}'. "
        "Output them in the same order, separated by that same delimiter, and do not add extra commentary.\n\n"
        f"{joined_text}"
    )

    response = openai.chat.completions.create(
        model="gpt-4o-2024-11-20",
        messages=[
            {
                "role": "system",
                "content": (
                    "You are a helpful assistant that accurately translates "
                    "Japanese text into Chinese, considering the background context. "
                    "Preserve meaning, style, and tone."
                ),
            },
            {
                "role": "user",
                "content": user_prompt,
            },
        ],
        temperature=0.2,
    )

    full_translation = response.choices[0].message.content.strip()
    translated_blocks = [t.strip() for t in full_translation.split(delimiter)]
    return translated_blocks


async def async_process_chunk(
    chunk_index: int,
    block_indices: list[int],
    block_texts: list[str],
    summary_context: str,
    results_container: dict[int, str],
) -> None:
    """
    Translates the batch of `block_texts` (all from a single chunk) asynchronously.
    Stores each translation in `results_container[index] = translation_text`.
    """
    translations = await async_translate_batch(block_texts, summary_context)
    for i, trans in zip(block_indices, translations):
        results_container[i] = trans


###############################################################################
#                         MAIN TRANSLATION LOGIC
###############################################################################

async def process_srt_with_summary_and_async(
    input_file: str,
    output_file: str,
    chunk_size: int = 5,
    sample_interval: int = 10
):
    """
    1. Two-step Summarize the entire SRT text for context (async).
    2. Prompt user to confirm or edit that final summary.
    3. Translate SRT blocks in chunks (in parallel) using async, with the final summary as context.
    4. Save bilingual SRT.
    """

    # ------------------- Read entire SRT and parse blocks ---------------------
    with open(input_file, "r", encoding="utf-8") as f:
        lines = f.read().splitlines()
    srt_full_text = "\n".join(lines)

    blocks = []
    current_block = []
    for line in lines:
        if line.strip() == "":
            if current_block:
                blocks.append(current_block)
                current_block = []
        else:
            current_block.append(line)
    if current_block:
        blocks.append(current_block)

    # ------------------- Two-part Summarization (Async) -----------------------
    print("Performing two-part summarization of the entire SRT for context...")
    final_summary = await async_summarize_two_parts(srt_full_text)
    print("\n------ PROPOSED FINAL SUMMARY ------")
    print(final_summary)
    print("------ END SUMMARY ------\n")

    # Prompt user for confirmation or edits
    confirm = input("Do you want to use this final summary as is? (y/n) ")
    if confirm.lower().strip() != "y":
        print("Please type in the corrected/edited summary (or press Enter to keep it as is):")
        user_edited = input("> ").strip()
        if user_edited:
            final_summary = user_edited

    # ------------------- Prepare block placeholders & gather JP text ----------
    new_srt_blocks = []
    block_texts_jp = []
    for block in blocks:
        if len(block) < 3:
            # Malformed or no text
            new_srt_blocks.append(block)
            block_texts_jp.append("")
            continue

        index_line = block[0]
        time_line = block[1]
        subtitle_text_lines = block[2:]
        subtitle_jp = "\n".join(subtitle_text_lines)

        new_srt_blocks.append([index_line, time_line, subtitle_jp, None])
        block_texts_jp.append(subtitle_jp)

    print("\nTranslating with background context...\n")
    total_blocks = len(block_texts_jp)
    print(f"Total blocks to translate: {total_blocks}")

    # ------------------- Build async tasks for chunk parallelization ----------
    tasks = []
    results_container = {}  # {block_index: "translated text"}

    block_ranges = range(0, total_blocks, chunk_size)
    for start in block_ranges:
        end = start + chunk_size
        chunk = block_texts_jp[start:end]

        # Indices of non-empty blocks in this chunk
        chunk_indices = [i for i, txt in enumerate(chunk, start=start) if txt.strip() != ""]
        actual_texts = [block_texts_jp[i] for i in chunk_indices]

        if not actual_texts:
            continue

        # Create an async task for each chunk
        task = asyncio.create_task(
            async_process_chunk(
                chunk_index=start // chunk_size,
                block_indices=chunk_indices,
                block_texts=actual_texts,
                summary_context=final_summary,
                results_container=results_container
            )
        )
        tasks.append(task)

    # Run all chunk translations concurrently, with a progress bar
    for f in tqdm(asyncio.as_completed(tasks), total=len(tasks), desc="Translating chunks"):
        await f

    # Show sample translations (by chunk) for user to see progress
    chunk_count = (total_blocks + chunk_size - 1) // chunk_size
    for c in range(chunk_count):
        if (c+1) % sample_interval == 0:
            start = c * chunk_size
            end = start + chunk_size
            chunk_indices = [i for i in range(start, min(end, total_blocks)) if block_texts_jp[i].strip() != ""]
            if not chunk_indices:
                continue
            sample_ix = chunk_indices[-1]
            if sample_ix in results_container:
                jp_text = block_texts_jp[sample_ix]
                cn_text = results_container[sample_ix]
                print("\n--- Translation Sample ---")
                print(f"Block {sample_ix+1} Japanese:\n{jp_text}")
                print(f"Block {sample_ix+1} Chinese:\n{cn_text}")
                print("--- End Sample ---\n")

    # ------------------- Write out bilingual SRT ------------------------------
    with open(output_file, "w", encoding="utf-8") as out:
        for i, block in enumerate(new_srt_blocks):
            if len(block) == 4:
                jp_text = block[2]
                cn_text = results_container.get(i)
                if cn_text:
                    out.write(block[0] + "\n")  # index
                    out.write(block[1] + "\n")  # time
                    # out.write(jp_text + "\n")  # original JP
                    out.write(cn_text + "\n")  # translated CN
                    out.write("\n")
                else:
                    for line in block[:3]:
                        out.write(line + "\n")
                    out.write("\n")
            else:
                for line in block:
                    out.write(line + "\n")
                out.write("\n")

    print(f"\nTranslation complete. Output saved to '{output_file}'.")


###############################################################################
#                                  ENTRY POINT
###############################################################################

def main():
    parser = argparse.ArgumentParser(
        description=(
            "Translate an SRT file from Japanese to Chinese using GPT-4o-2024-11-20 with:\n"
            "1) Two-part summarization for context.\n"
            "2) Async parallel chunked translation.\n"
            "3) Bilingual output.\n"
        ),
        formatter_class=argparse.RawDescriptionHelpFormatter
    )
    parser.add_argument("--input", required=True, help="Path to the input Japanese SRT file")
    parser.add_argument("--output", required=True, help="Path to save the output bilingual SRT file")
    parser.add_argument("--chunk_size", type=int, default=5, help="Number of blocks to translate per chunk")
    parser.add_argument("--sample_interval", type=int, default=10, help="Print a sample translation every N chunks")

    args = parser.parse_args()

    openai.api_key = os.environ.get("OPENAI_API_KEY")
    if not openai.api_key:
        raise ValueError("No OPENAI_API_KEY found in environment variables.")

    asyncio.run(
        process_srt_with_summary_and_async(
            input_file=args.input,
            output_file=args.output,
            chunk_size=args.chunk_size,
            sample_interval=args.sample_interval,
        )
    )

if __name__ == "__main__":
    main()

4. Combine SRT into Video

To embed subtitles into a video, you can use the ffmpeg tool, which is powerful and versatile for video processing tasks. Below is an example command to add your subtitles:

ffmpeg \
  -i input_video.mp4 \
  -i "big_subtitles_cn.srt" \
  -map 0:v -map 0:a -map 1:0 \
  -c copy -c:s mov_text \
  -metadata:s:s:0 language=zho \
  -disposition:s:0 default \
  output_video.mp4

Explanation of Parameters:

  • -i input_video.mp4: Specifies the input video file.
  • -i "big_subtitles_cn.srt": Specifies the SRT subtitle file.
  • -map 0:v -map 0:a -map 1:0: Maps the video, audio, and subtitle streams to the output file.
  • -c copy: Copies the video and audio streams without re-encoding them.
  • -c:s mov_text: Specifies the subtitle codec, compatible with MP4 files.
  • -metadata:s:s:0 language=zho: Sets the language metadata of the subtitle stream to Chinese (zho).
  • -disposition:s:0 default: Marks the subtitle stream as the default.

Tips:

  • Ensure the SRT file is properly formatted and synced with the video duration.
  • If you encounter issues with unsupported subtitle codecs, consider converting the SRT file to another format (e.g., ASS) or using a different container format like MKV.

Output:

This command produces a new video file (output_video.mp4) with embedded subtitles, ready for playback on most media players.


Putting It All Together

Let’s put it all together and run the following command:

python transcribe_audio.py \
  --input input_video.mp4 \
  --chunk_minutes 10 \
  --model whisper-1 \
  --language ja \
  --output_json transcript.json

python generate_srt.py \
  --input_json transcript.json \
  --output_srt output_subtitles.srt

python translate_srt_jp_to_cn.py \
  --input_srt output_subtitles.srt \
  --output_srt big_subtitles_cn.srt

ffmpeg \
  -i input_video.mp4 \
  -i "big_subtitles_cn.srt" \
  -map 0:v -map 0:a -map 1:0 \
  -c copy -c:s mov_text \
  -metadata:s:s:0 language=zho \
  -disposition:s:0 default \
  output_video.mp4

Conclusion

This end-to-end solution leverages OpenAI Whisper for speech-to-text and GPT-4o for high-quality translations. By chunking large audio files, generating accurate transcriptions, and translating contextually, you can produce coherent, accurate subtitles in Chinese from Japanese video content.

Feel free to adapt these scripts to your workflow. With these tools, you can localize Japanese content efficiently!