Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ API_KEY=your_api_key_here

# s3 Compatible Storage Env Vars
#
#S3_ACCESS_KEY=your_access_key
#S3_SECRET_KEY=your_secret_key
#S3_ENDPOINT_URL=https://your-endpoint-url
#S3_REGION=your-region
#S3_BUCKET_NAME=your-bucket-name
S3_ACCESS_KEY=your_access_key
S3_SECRET_KEY=your_secret_key
S3_ENDPOINT_URL=https://your-endpoint-url
S3_REGION=your-region
S3_BUCKET_NAME=your-bucket-name


# Google Cloud Storage Env Variables
Expand All @@ -46,4 +46,10 @@ API_KEY=your_api_key_here
# Purpose: The base path for storage operations.
# Default: GCP
# Requirement: Optional.
#STORAGE_PATH=GCP
#STORAGE_PATH=GCP

RUNPOD_API_KEY=your_runpod_api_key_here

# RunPod and Whisper Configuration
USE_RUNPOD=true
WHISPER_CACHE_DIR=/app/whisper_cache
1 change: 0 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
libaom-dev \
libdav1d-dev \
librav1e-dev \
libsvtav1-dev \
libzimg-dev \
libwebp-dev \
git \
Expand Down
7 changes: 7 additions & 0 deletions config.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@
if not API_KEY:
raise ValueError("API_KEY environment variable is not set")

# Runpod API configuration
RUNPOD_API_KEY = os.environ.get('RUNPOD_API_KEY')
USE_RUNPOD = os.environ.get('USE_RUNPOD', 'false').lower() == 'true'
RUNPOD_WEBHOOK_URL = os.environ.get('RUNPOD_WEBHOOK_URL') # Optional webhook URL for async processing
RUNPOD_MAX_WAIT_TIME = int(os.environ.get('RUNPOD_MAX_WAIT_TIME', '600')) # Max wait time in seconds (default: 10 minutes)
RUNPOD_POLL_INTERVAL = int(os.environ.get('RUNPOD_POLL_INTERVAL', '5')) # Poll interval in seconds (default: 5 seconds)

# Storage path setting
LOCAL_STORAGE_PATH = os.environ.get('LOCAL_STORAGE_PATH', '/tmp')

Expand Down
22 changes: 22 additions & 0 deletions docker-compose-custom-build.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
version: '3.8'

services:
nca-toolkit:
build:
context: .
dockerfile: Dockerfile
environment:
- API_KEY=${API_KEY}
- PYTHONUNBUFFERED=1
- WHISPER_CACHE_DIR=/app/whisper_cache
- S3_SECRET_KEY=${S3_SECRET_KEY}
- RUNPOD_API_KEY=${RUNPOD_API_KEY}
- S3_BUCKET_NAME=${S3_BUCKET_NAME}
- S3_ENDPOINT_URL=${S3_ENDPOINT_URL}
- S3_REGION=${S3_REGION}
- S3_ACCESS_KEY=${S3_ACCESS_KEY}
- USE_RUNPOD=true
working_dir: /app
ports:
- "8080:8080"
restart: unless-stopped
6 changes: 4 additions & 2 deletions routes/v1/media/media_transcribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@
"language": {"type": "string"},
"webhook_url": {"type": "string", "format": "uri"},
"id": {"type": "string"},
"words_per_line": {"type": "integer", "minimum": 1}
"words_per_line": {"type": "integer", "minimum": 1},
"initial_prompt": {"type": "string"}
},
"required": ["media_url"],
"additionalProperties": False
Expand All @@ -58,13 +59,14 @@ def transcribe(job_id, data):
response_type = data.get('response_type', 'direct')
language = data.get('language', None)
webhook_url = data.get('webhook_url')
initial_prompt = data.get('initial_prompt', None)
id = data.get('id')
words_per_line = data.get('words_per_line', None)

logger.info(f"Job {job_id}: Received transcription request for {media_url}")

try:
result = process_transcribe_media(media_url, task, include_text, include_srt, include_segments, word_timestamps, response_type, language, job_id, words_per_line)
result = process_transcribe_media(media_url, task, include_text, include_srt, include_segments, word_timestamps, response_type, language, job_id, words_per_line, initial_prompt)
logger.info(f"Job {job_id}: Transcription process completed successfully")

# If the result is a file path, upload it using the unified upload_file() method
Expand Down
207 changes: 207 additions & 0 deletions routes/webhook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
# Copyright (c) 2025 Stephen G. Pope
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

from flask import Blueprint, request, jsonify
import logging
import json
from typing import Dict, Any

# Set up logging
logger = logging.getLogger(__name__)

# Create the blueprint
webhook_bp = Blueprint('webhook', __name__, url_prefix='/v1/webhook')

# In-memory storage for webhook results (in production, use Redis or database)
webhook_results = {}
Comment on lines +28 to +29
Copy link

Copilot AI Aug 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using in-memory storage for webhook results will lose data on application restart. The comment mentions using Redis or database in production, but this should be implemented or at least have proper warning documentation.

Suggested change
# In-memory storage for webhook results (in production, use Redis or database)
webhook_results = {}
# WARNING: In-memory storage for webhook results.
# All webhook results will be lost on application restart or crash.
# DO NOT USE THIS IN PRODUCTION. Use Redis or a persistent database instead.
webhook_results = {}
logging.warning(
"USING IN-MEMORY STORAGE FOR WEBHOOK RESULTS. "
"ALL DATA WILL BE LOST ON APPLICATION RESTART. "
"DO NOT USE THIS IN PRODUCTION. Use Redis or a persistent database."
)

Copilot uses AI. Check for mistakes.

@webhook_bp.route('/runpod', methods=['POST'])
def handle_runpod_webhook():
"""
Handle webhook callbacks from Runpod API.

This endpoint receives the results of async Runpod transcription jobs
and stores them for later retrieval.
"""
try:
# Get the JSON payload from Runpod
data = request.get_json()

if not data:
logger.error("No JSON data received in webhook")
return jsonify({"error": "No JSON data received"}), 400

# Extract job information
job_id = data.get('id')
status = data.get('status')
output = data.get('output')
error = data.get('error')

logger.info(f"Received webhook for job {job_id} with status: {status}")

if not job_id:
logger.error("No job ID in webhook data")
return jsonify({"error": "No job ID provided"}), 400

# Store the result
webhook_results[job_id] = {
'job_id': job_id,
'status': status,
'output': output,
'error': error,
'timestamp': data.get('timestamp'),
'raw_data': data
}

if status == 'COMPLETED':
logger.info(f"Job {job_id} completed successfully")
elif status == 'FAILED':
logger.error(f"Job {job_id} failed: {error}")
else:
logger.info(f"Job {job_id} status update: {status}")

# Return success response
return jsonify({
"status": "received",
"job_id": job_id,
"message": f"Webhook processed for job {job_id}"
}), 200

except Exception as e:
logger.error(f"Error processing webhook: {str(e)}")
return jsonify({"error": "Internal server error"}), 500

@webhook_bp.route('/runpod/status/<job_id>', methods=['GET'])
def get_webhook_result(job_id: str):
"""
Get the result of a Runpod job from webhook storage.

Args:
job_id: The job ID to get results for

Returns:
JSON response with job results or status
"""
try:
if job_id not in webhook_results:
return jsonify({
"error": "Job not found",
"job_id": job_id,
"message": "Job ID not found in webhook results"
}), 404

result = webhook_results[job_id]

if result['status'] == 'COMPLETED':
# Transform the output to Whisper format if needed
from services.runpod_whisper import runpod_client

try:
transformed_result = runpod_client._transform_runpod_response(result['output'])
return jsonify({
"status": "completed",
"job_id": job_id,
"result": transformed_result
}), 200
except Exception as transform_error:
logger.error(f"Error transforming result for job {job_id}: {str(transform_error)}")
return jsonify({
"status": "completed",
"job_id": job_id,
"raw_output": result['output'],
"transform_error": str(transform_error)
}), 200

elif result['status'] == 'FAILED':
return jsonify({
"status": "failed",
"job_id": job_id,
"error": result['error']
}), 200

else:
return jsonify({
"status": result['status'],
"job_id": job_id,
"message": f"Job is {result['status']}"
}), 200

except Exception as e:
logger.error(f"Error getting webhook result for job {job_id}: {str(e)}")
return jsonify({"error": "Internal server error"}), 500

@webhook_bp.route('/runpod/jobs', methods=['GET'])
def list_webhook_jobs():
"""
List all jobs stored in webhook results.

Returns:
JSON response with list of job IDs and their statuses
"""
try:
jobs = []
for job_id, result in webhook_results.items():
jobs.append({
"job_id": job_id,
"status": result['status'],
"timestamp": result['timestamp']
})

return jsonify({
"jobs": jobs,
"total": len(jobs)
}), 200

except Exception as e:
logger.error(f"Error listing webhook jobs: {str(e)}")
return jsonify({"error": "Internal server error"}), 500

@webhook_bp.route('/runpod/cleanup', methods=['POST'])
def cleanup_webhook_results():
"""
Clean up completed or failed jobs from webhook storage.

Returns:
JSON response with cleanup results
"""
try:
data = request.get_json() or {}
cleanup_completed = data.get('cleanup_completed', True)
cleanup_failed = data.get('cleanup_failed', True)

cleaned_jobs = []
for job_id in list(webhook_results.keys()):
result = webhook_results[job_id]
should_clean = False

if cleanup_completed and result['status'] == 'COMPLETED':
should_clean = True
elif cleanup_failed and result['status'] == 'FAILED':
should_clean = True

if should_clean:
del webhook_results[job_id]
cleaned_jobs.append(job_id)

return jsonify({
"cleaned_jobs": cleaned_jobs,
"count": len(cleaned_jobs),
"remaining_jobs": len(webhook_results)
}), 200

except Exception as e:
logger.error(f"Error cleaning up webhook results: {str(e)}")
return jsonify({"error": "Internal server error"}), 500
50 changes: 39 additions & 11 deletions services/ass_toolkit.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@
import re
from services.file_management import download_file
from services.cloud_storage import upload_file # Ensure this import is present
from services.runpod_whisper import transcribe_with_runpod
import requests # Ensure requests is imported for webhook handling
from urllib.parse import urlparse
from config import LOCAL_STORAGE_PATH
from config import LOCAL_STORAGE_PATH, USE_RUNPOD, RUNPOD_API_KEY

# Initialize logger
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -64,16 +65,43 @@ def rgb_to_ass_color(rgb_color):

def generate_transcription(video_path, language='auto'):
try:
model = whisper.load_model("base")
transcription_options = {
'word_timestamps': True,
'verbose': True,
}
if language != 'auto':
transcription_options['language'] = language
result = model.transcribe(video_path, **transcription_options)
logger.info(f"Transcription generated successfully for video: {video_path}")
return result
# Choose transcription method based on configuration
if USE_RUNPOD and RUNPOD_API_KEY:
logger.info("Using Runpod API for transcription")
# For Runpod, we need to pass the video URL instead of local path
# Assuming video_path could be either a local path or URL
if video_path.startswith(('http://', 'https://')):
video_url = video_path
else:
# If it's a local path, we would need to upload it first
# For now, we'll assume URLs are passed when using Runpod
logger.warning("Runpod requires a URL, but local path provided. Falling back to local Whisper.")
model = whisper.load_model("base")
transcription_options = {
'word_timestamps': True,
'verbose': True,
}
if language != 'auto':
transcription_options['language'] = language
result = model.transcribe(video_path, **transcription_options)
logger.info(f"Transcription generated successfully for video: {video_path}")
return result

result = transcribe_with_runpod(video_url, model="turbo", language=language)
logger.info("Runpod transcription completed")
return result
else:
logger.info("Using local Whisper model for transcription")
model = whisper.load_model("base")
transcription_options = {
'word_timestamps': True,
'verbose': True,
}
if language != 'auto':
transcription_options['language'] = language
result = model.transcribe(video_path, **transcription_options)
logger.info(f"Transcription generated successfully for video: {video_path}")
return result
except Exception as e:
logger.error(f"Error in transcription: {str(e)}")
raise
Expand Down
Loading