From 42f463e3cd224bb9994691a6105320558f00bfa1 Mon Sep 17 00:00:00 2001 From: Vignesh Date: Fri, 8 Aug 2025 22:47:23 +0530 Subject: [PATCH 1/4] Implement Runpod API integration for transcription and add webhook handling --- .env.example | 4 +- config.py | 7 + routes/v1/media/media_transcribe.py | 6 +- routes/webhook.py | 207 ++++++++++++++++ services/ass_toolkit.py | 50 +++- services/runpod_whisper.py | 327 ++++++++++++++++++++++++++ services/transcription.py | 54 +++-- services/v1/media/media_transcribe.py | 92 ++++++-- 8 files changed, 688 insertions(+), 59 deletions(-) create mode 100644 routes/webhook.py create mode 100644 services/runpod_whisper.py diff --git a/.env.example b/.env.example index e8af750f..9344f25e 100644 --- a/.env.example +++ b/.env.example @@ -46,4 +46,6 @@ API_KEY=your_api_key_here # Purpose: The base path for storage operations. # Default: GCP # Requirement: Optional. -#STORAGE_PATH=GCP \ No newline at end of file +#STORAGE_PATH=GCP + +RUNPOD_API_KEY=your_runpod_api_key_here \ No newline at end of file diff --git a/config.py b/config.py index 2b4e9826..b7c38327 100644 --- a/config.py +++ b/config.py @@ -24,6 +24,13 @@ if not API_KEY: raise ValueError("API_KEY environment variable is not set") +# Runpod API configuration +RUNPOD_API_KEY = os.environ.get('RUNPOD_API_KEY') +USE_RUNPOD = os.environ.get('USE_RUNPOD', 'false').lower() == 'true' +RUNPOD_WEBHOOK_URL = os.environ.get('RUNPOD_WEBHOOK_URL') # Optional webhook URL for async processing +RUNPOD_MAX_WAIT_TIME = int(os.environ.get('RUNPOD_MAX_WAIT_TIME', '600')) # Max wait time in seconds (default: 10 minutes) +RUNPOD_POLL_INTERVAL = int(os.environ.get('RUNPOD_POLL_INTERVAL', '5')) # Poll interval in seconds (default: 5 seconds) + # Storage path setting LOCAL_STORAGE_PATH = os.environ.get('LOCAL_STORAGE_PATH', '/tmp') diff --git a/routes/v1/media/media_transcribe.py b/routes/v1/media/media_transcribe.py index 9609210b..805c1b7d 100644 --- a/routes/v1/media/media_transcribe.py +++ b/routes/v1/media/media_transcribe.py @@ -42,7 +42,8 @@ "language": {"type": "string"}, "webhook_url": {"type": "string", "format": "uri"}, "id": {"type": "string"}, - "words_per_line": {"type": "integer", "minimum": 1} + "words_per_line": {"type": "integer", "minimum": 1}, + "initial_prompt": {"type": "string"} }, "required": ["media_url"], "additionalProperties": False @@ -58,13 +59,14 @@ def transcribe(job_id, data): response_type = data.get('response_type', 'direct') language = data.get('language', None) webhook_url = data.get('webhook_url') + initial_prompt = data.get('initial_prompt', None) id = data.get('id') words_per_line = data.get('words_per_line', None) logger.info(f"Job {job_id}: Received transcription request for {media_url}") try: - result = process_transcribe_media(media_url, task, include_text, include_srt, include_segments, word_timestamps, response_type, language, job_id, words_per_line) + result = process_transcribe_media(media_url, task, include_text, include_srt, include_segments, word_timestamps, response_type, language, job_id, words_per_line, initial_prompt) logger.info(f"Job {job_id}: Transcription process completed successfully") # If the result is a file path, upload it using the unified upload_file() method diff --git a/routes/webhook.py b/routes/webhook.py new file mode 100644 index 00000000..f88c043b --- /dev/null +++ b/routes/webhook.py @@ -0,0 +1,207 @@ +# Copyright (c) 2025 Stephen G. Pope +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +from flask import Blueprint, request, jsonify +import logging +import json +from typing import Dict, Any + +# Set up logging +logger = logging.getLogger(__name__) + +# Create the blueprint +webhook_bp = Blueprint('webhook', __name__, url_prefix='/v1/webhook') + +# In-memory storage for webhook results (in production, use Redis or database) +webhook_results = {} + +@webhook_bp.route('/runpod', methods=['POST']) +def handle_runpod_webhook(): + """ + Handle webhook callbacks from Runpod API. + + This endpoint receives the results of async Runpod transcription jobs + and stores them for later retrieval. + """ + try: + # Get the JSON payload from Runpod + data = request.get_json() + + if not data: + logger.error("No JSON data received in webhook") + return jsonify({"error": "No JSON data received"}), 400 + + # Extract job information + job_id = data.get('id') + status = data.get('status') + output = data.get('output') + error = data.get('error') + + logger.info(f"Received webhook for job {job_id} with status: {status}") + + if not job_id: + logger.error("No job ID in webhook data") + return jsonify({"error": "No job ID provided"}), 400 + + # Store the result + webhook_results[job_id] = { + 'job_id': job_id, + 'status': status, + 'output': output, + 'error': error, + 'timestamp': data.get('timestamp'), + 'raw_data': data + } + + if status == 'COMPLETED': + logger.info(f"Job {job_id} completed successfully") + elif status == 'FAILED': + logger.error(f"Job {job_id} failed: {error}") + else: + logger.info(f"Job {job_id} status update: {status}") + + # Return success response + return jsonify({ + "status": "received", + "job_id": job_id, + "message": f"Webhook processed for job {job_id}" + }), 200 + + except Exception as e: + logger.error(f"Error processing webhook: {str(e)}") + return jsonify({"error": "Internal server error"}), 500 + +@webhook_bp.route('/runpod/status/', methods=['GET']) +def get_webhook_result(job_id: str): + """ + Get the result of a Runpod job from webhook storage. + + Args: + job_id: The job ID to get results for + + Returns: + JSON response with job results or status + """ + try: + if job_id not in webhook_results: + return jsonify({ + "error": "Job not found", + "job_id": job_id, + "message": "Job ID not found in webhook results" + }), 404 + + result = webhook_results[job_id] + + if result['status'] == 'COMPLETED': + # Transform the output to Whisper format if needed + from services.runpod_whisper import runpod_client + + try: + transformed_result = runpod_client._transform_runpod_response(result['output']) + return jsonify({ + "status": "completed", + "job_id": job_id, + "result": transformed_result + }), 200 + except Exception as transform_error: + logger.error(f"Error transforming result for job {job_id}: {str(transform_error)}") + return jsonify({ + "status": "completed", + "job_id": job_id, + "raw_output": result['output'], + "transform_error": str(transform_error) + }), 200 + + elif result['status'] == 'FAILED': + return jsonify({ + "status": "failed", + "job_id": job_id, + "error": result['error'] + }), 200 + + else: + return jsonify({ + "status": result['status'], + "job_id": job_id, + "message": f"Job is {result['status']}" + }), 200 + + except Exception as e: + logger.error(f"Error getting webhook result for job {job_id}: {str(e)}") + return jsonify({"error": "Internal server error"}), 500 + +@webhook_bp.route('/runpod/jobs', methods=['GET']) +def list_webhook_jobs(): + """ + List all jobs stored in webhook results. + + Returns: + JSON response with list of job IDs and their statuses + """ + try: + jobs = [] + for job_id, result in webhook_results.items(): + jobs.append({ + "job_id": job_id, + "status": result['status'], + "timestamp": result['timestamp'] + }) + + return jsonify({ + "jobs": jobs, + "total": len(jobs) + }), 200 + + except Exception as e: + logger.error(f"Error listing webhook jobs: {str(e)}") + return jsonify({"error": "Internal server error"}), 500 + +@webhook_bp.route('/runpod/cleanup', methods=['POST']) +def cleanup_webhook_results(): + """ + Clean up completed or failed jobs from webhook storage. + + Returns: + JSON response with cleanup results + """ + try: + data = request.get_json() or {} + cleanup_completed = data.get('cleanup_completed', True) + cleanup_failed = data.get('cleanup_failed', True) + + cleaned_jobs = [] + for job_id in list(webhook_results.keys()): + result = webhook_results[job_id] + should_clean = False + + if cleanup_completed and result['status'] == 'COMPLETED': + should_clean = True + elif cleanup_failed and result['status'] == 'FAILED': + should_clean = True + + if should_clean: + del webhook_results[job_id] + cleaned_jobs.append(job_id) + + return jsonify({ + "cleaned_jobs": cleaned_jobs, + "count": len(cleaned_jobs), + "remaining_jobs": len(webhook_results) + }), 200 + + except Exception as e: + logger.error(f"Error cleaning up webhook results: {str(e)}") + return jsonify({"error": "Internal server error"}), 500 diff --git a/services/ass_toolkit.py b/services/ass_toolkit.py index 9b88376d..8ac02236 100644 --- a/services/ass_toolkit.py +++ b/services/ass_toolkit.py @@ -26,9 +26,10 @@ import re from services.file_management import download_file from services.cloud_storage import upload_file # Ensure this import is present +from services.runpod_whisper import transcribe_with_runpod import requests # Ensure requests is imported for webhook handling from urllib.parse import urlparse -from config import LOCAL_STORAGE_PATH +from config import LOCAL_STORAGE_PATH, USE_RUNPOD, RUNPOD_API_KEY # Initialize logger logger = logging.getLogger(__name__) @@ -64,16 +65,43 @@ def rgb_to_ass_color(rgb_color): def generate_transcription(video_path, language='auto'): try: - model = whisper.load_model("base") - transcription_options = { - 'word_timestamps': True, - 'verbose': True, - } - if language != 'auto': - transcription_options['language'] = language - result = model.transcribe(video_path, **transcription_options) - logger.info(f"Transcription generated successfully for video: {video_path}") - return result + # Choose transcription method based on configuration + if USE_RUNPOD and RUNPOD_API_KEY: + logger.info("Using Runpod API for transcription") + # For Runpod, we need to pass the video URL instead of local path + # Assuming video_path could be either a local path or URL + if video_path.startswith(('http://', 'https://')): + video_url = video_path + else: + # If it's a local path, we would need to upload it first + # For now, we'll assume URLs are passed when using Runpod + logger.warning("Runpod requires a URL, but local path provided. Falling back to local Whisper.") + model = whisper.load_model("base") + transcription_options = { + 'word_timestamps': True, + 'verbose': True, + } + if language != 'auto': + transcription_options['language'] = language + result = model.transcribe(video_path, **transcription_options) + logger.info(f"Transcription generated successfully for video: {video_path}") + return result + + result = transcribe_with_runpod(video_url, model="turbo", language=language) + logger.info("Runpod transcription completed") + return result + else: + logger.info("Using local Whisper model for transcription") + model = whisper.load_model("base") + transcription_options = { + 'word_timestamps': True, + 'verbose': True, + } + if language != 'auto': + transcription_options['language'] = language + result = model.transcribe(video_path, **transcription_options) + logger.info(f"Transcription generated successfully for video: {video_path}") + return result except Exception as e: logger.error(f"Error in transcription: {str(e)}") raise diff --git a/services/runpod_whisper.py b/services/runpod_whisper.py new file mode 100644 index 00000000..5954b459 --- /dev/null +++ b/services/runpod_whisper.py @@ -0,0 +1,327 @@ +# Copyright (c) 2025 Stephen G. Pope +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import os +import requests +import logging +import time +from typing import Dict, Any, Optional + +# Set up logging +logger = logging.getLogger(__name__) + +class RunpodWhisperClient: + """Client for interacting with Runpod Whisper API service.""" + + def __init__(self): + self.api_key = os.environ.get('RUNPOD_API_KEY') + if not self.api_key: + raise ValueError("RUNPOD_API_KEY environment variable is not set") + + self.base_url = "https://api.runpod.ai/v2/n8j2ln49qh2n4x/run" + self.headers = { + "Authorization": f"Bearer {self.api_key}", + "Content-Type": "application/json" + } + + # Configuration from environment variables + self.webhook_url = os.environ.get('RUNPOD_WEBHOOK_URL') + self.max_wait_time = int(os.environ.get('RUNPOD_MAX_WAIT_TIME', '600')) + self.poll_interval = int(os.environ.get('RUNPOD_POLL_INTERVAL', '5')) + + def transcribe_audio(self, audio_url: str, model: str = "large-v3", language: Optional[str] = None, webhook_url: Optional[str] = None, initial_prompt: Optional[str] = None) -> Dict[str, Any]: + """ + Transcribe audio using Runpod Whisper API. + + Args: + audio_url: URL of the audio/video file to transcribe + model: Whisper model to use (default: "large-v") + language: Language code for transcription (optional) + webhook_url: Webhook URL for async processing (optional) + + Returns: + Dictionary containing transcription results in Whisper format + """ + try: + # Prepare the request payload + payload = { + "input": { + "audio": audio_url, + "model": model, + "temperature": 0.0, # Default temperature for transcription + # "enable_vad": True, # Enable Voice Activity Detection + } + } + + # Add language if specified + if language and language != 'auto': + payload["input"]["language"] = language + + if initial_prompt: + payload["input"]["initial_prompt"] = initial_prompt + + # Add webhook if provided for async processing, or use default webhook + webhook_to_use = webhook_url or self.webhook_url + if webhook_to_use: + payload["webhook"] = webhook_to_use + logger.info(f"Using async processing with webhook: {webhook_to_use}") + else: + logger.info("No webhook provided, will use synchronous polling") + + logger.info(f"Sending transcription request to Runpod API for audio: {audio_url}") + logger.info(f"Using model: {model}, language: {language}") + + # Make the API call + response = requests.post( + self.base_url, + json=payload, + headers=self.headers, + timeout=300 # 5 minute timeout + ) + + response.raise_for_status() + result = response.json() + + logger.info("Runpod API call successful") + + # If webhook is provided, return job information for async processing + webhook_to_use = webhook_url or self.webhook_url + if webhook_to_use: + logger.info(f"Async job submitted. Job ID: {result.get('id', 'unknown')}") + return { + "status": "submitted", + "job_id": result.get("id"), + "webhook_url": webhook_to_use, + "async": True + } + + # For synchronous processing, poll for results + return self._poll_for_results(result, self.max_wait_time, self.poll_interval) + + except requests.exceptions.RequestException as e: + logger.error(f"Runpod API request failed: {str(e)}") + raise Exception(f"Runpod API request failed: {str(e)}") + except Exception as e: + logger.error(f"Error in Runpod transcription: {str(e)}") + raise + + def _poll_for_results(self, initial_response: Dict[str, Any], max_wait_time: int = 600, poll_interval: int = 5) -> Dict[str, Any]: + """ + Poll Runpod API for job completion when no webhook is provided. + + Args: + initial_response: Initial response from Runpod API + max_wait_time: Maximum time to wait in seconds (default: 10 minutes) + poll_interval: Time between polls in seconds (default: 5 seconds) + + Returns: + Dictionary containing transcription results in Whisper format + """ + job_id = initial_response.get("id") + if not job_id: + # If no job ID, assume synchronous response with results + return self._transform_runpod_response(initial_response) + + logger.info(f"Polling for job completion. Job ID: {job_id}") + + status_url = f"https://api.runpod.ai/v2/n8j2ln49qh2n4x/status/{job_id}" + start_time = time.time() + + while (time.time() - start_time) < max_wait_time: + try: + response = requests.get(status_url, headers=self.headers, timeout=30) + response.raise_for_status() + status_data = response.json() + + status = status_data.get("status") + logger.info(f"Job status: {status}") + + if status == "COMPLETED": + output = status_data.get("output", {}) + logger.info("Job completed successfully") + return self._transform_runpod_response(output) + elif status == "FAILED": + error_msg = status_data.get("error", "Unknown error") + logger.error(f"Job failed: {error_msg}") + raise Exception(f"Runpod job failed: {error_msg}") + elif status in ["IN_QUEUE", "IN_PROGRESS"]: + logger.info(f"Job still processing... waiting {poll_interval} seconds") + time.sleep(poll_interval) + else: + logger.warning(f"Unknown job status: {status}") + time.sleep(poll_interval) + + except requests.exceptions.RequestException as e: + logger.error(f"Error polling job status: {str(e)}") + time.sleep(poll_interval) + + raise Exception(f"Job timed out after {max_wait_time} seconds") + + def get_job_status(self, job_id: str) -> Dict[str, Any]: + """ + Get the status of a Runpod job. + + Args: + job_id: The job ID to check + + Returns: + Dictionary containing job status information + """ + try: + status_url = f"https://api.runpod.ai/v2/n8j2ln49qh2n4x/status/{job_id}" + response = requests.get(status_url, headers=self.headers, timeout=30) + response.raise_for_status() + return response.json() + except requests.exceptions.RequestException as e: + logger.error(f"Error getting job status: {str(e)}") + raise Exception(f"Error getting job status: {str(e)}") + + def get_job_result(self, job_id: str) -> Dict[str, Any]: + """ + Get the result of a completed Runpod job. + + Args: + job_id: The job ID to get results for + + Returns: + Dictionary containing transcription results in Whisper format + """ + try: + status_data = self.get_job_status(job_id) + + if status_data.get("status") == "COMPLETED": + output = status_data.get("output", {}) + return self._transform_runpod_response(output) + elif status_data.get("status") == "FAILED": + error_msg = status_data.get("error", "Unknown error") + raise Exception(f"Job failed: {error_msg}") + else: + return { + "status": status_data.get("status"), + "job_id": job_id, + "completed": False + } + except Exception as e: + logger.error(f"Error getting job result: {str(e)}") + raise + + def _transform_runpod_response(self, runpod_result: Dict[str, Any]) -> Dict[str, Any]: + """ + Transform Runpod response format to match local Whisper response format. + + Args: + runpod_result: Response from Runpod API + + Returns: + Dictionary in Whisper format with 'text' and 'segments' keys + """ + try: + # Extract the main transcription text + text = runpod_result.get('transcription', '') + + # Extract segments + segments = runpod_result.get('segments', []) + + # Transform segments to match Whisper format + whisper_segments = [] + for segment in segments: + whisper_segment = { + 'id': segment.get('id', 0), + 'start': segment.get('start', 0.0), + 'end': segment.get('end', 0.0), + 'text': segment.get('text', ''), + 'words': [] # Runpod doesn't provide word-level timestamps by default + } + + # If word-level data exists in the future, we can enhance this + # For now, we generate approximate word timestamps if needed + if segment.get('text'): + words_list = segment['text'].strip().split() + if words_list: + segment_duration = segment.get('end', 0.0) - segment.get('start', 0.0) + word_duration = segment_duration / len(words_list) if len(words_list) > 0 else 0.0 + + for i, word in enumerate(words_list): + word_start = segment.get('start', 0.0) + (i * word_duration) + word_end = word_start + word_duration + + whisper_segment['words'].append({ + 'word': f' {word}', # Whisper format includes leading space + 'start': word_start, + 'end': word_end, + 'probability': 0.8 # Default probability since we don't have actual data + }) + + whisper_segments.append(whisper_segment) + + # Create the final result in Whisper format + whisper_result = { + 'text': text, + 'segments': whisper_segments, + 'language': runpod_result.get('detected_language', 'en') + } + + logger.info(f"Transformed Runpod response: {len(whisper_segments)} segments, language: {whisper_result['language']}") + logger.info(f"Generated approximate word timestamps for {sum(len(seg['words']) for seg in whisper_segments)} words") + + return whisper_result + + except Exception as e: + logger.error(f"Error transforming Runpod response: {str(e)}") + raise Exception(f"Error transforming Runpod response: {str(e)}") + +# Global instance for easy access +runpod_client = RunpodWhisperClient() + +def transcribe_with_runpod(audio_url: str, model: str = "large-v3", language: Optional[str] = None, webhook_url: Optional[str] = None, initial_prompt: Optional[str] = None) -> Dict[str, Any]: + """ + Convenience function to transcribe audio using Runpod. + + Args: + audio_url: URL of the audio/video file to transcribe + model: Whisper model to use (default: "large-v3") + language: Language code for transcription (optional) + webhook_url: Webhook URL for async processing (optional) + + Returns: + Dictionary containing transcription results in Whisper format + or job information if webhook is provided + """ + return runpod_client.transcribe_audio(audio_url, model, language, webhook_url, initial_prompt="") + +def get_runpod_job_status(job_id: str) -> Dict[str, Any]: + """ + Get the status of a Runpod job. + + Args: + job_id: The job ID to check + + Returns: + Dictionary containing job status information + """ + return runpod_client.get_job_status(job_id) + +def get_runpod_job_result(job_id: str) -> Dict[str, Any]: + """ + Get the result of a completed Runpod job. + + Args: + job_id: The job ID to get results for + + Returns: + Dictionary containing transcription results in Whisper format + """ + return runpod_client.get_job_result(job_id) diff --git a/services/transcription.py b/services/transcription.py index 0691be0d..36162ade 100644 --- a/services/transcription.py +++ b/services/transcription.py @@ -22,6 +22,8 @@ from datetime import timedelta from whisper.utils import WriteSRT, WriteVTT from services.file_management import download_file +from services.runpod_whisper import transcribe_with_runpod +from config import USE_RUNPOD, RUNPOD_API_KEY import logging import uuid @@ -35,23 +37,44 @@ def process_transcription(media_url, output_type, max_chars=56, language=None,): """Transcribe media and return the transcript, SRT or ASS file path.""" logger.info(f"Starting transcription for media URL: {media_url} with output type: {output_type}") - input_filename = download_file(media_url, os.path.join(STORAGE_PATH, 'input_media')) - logger.info(f"Downloaded media to local file: {input_filename}") - + logger.info(f"Checking Runpod configuration: USE_RUNPOD={USE_RUNPOD}, RUNPOD_API_KEY={'✅ Set' if RUNPOD_API_KEY else '❌ Not set'}") try: - model = whisper.load_model("base") - logger.info("Loaded Whisper model") - - # result = model.transcribe(input_filename) - # logger.info("Transcription completed") + # Choose transcription method based on configuration + if USE_RUNPOD and RUNPOD_API_KEY: + logger.info("Using Runpod API for transcription") + # For Runpod, we use the media URL directly + result = transcribe_with_runpod(media_url, model="turbo", language=language) + logger.info("Runpod transcription completed") + else: + logger.info("Using local Whisper model for transcription") + # Download file for local processing + input_filename = download_file(media_url, os.path.join(STORAGE_PATH, 'input_media')) + logger.info(f"Downloaded media to local file: {input_filename}") + + model = whisper.load_model("base") + logger.info("Loaded Whisper model") + + if output_type == 'ass': + result = model.transcribe( + input_filename, + word_timestamps=True, + task='transcribe', + verbose=False, + language=language + ) + else: + result = model.transcribe(input_filename, language=language) + + logger.info("Local transcription completed") + + # Clean up local file + os.remove(input_filename) + logger.info(f"Removed local file: {input_filename}") if output_type == 'transcript': - result = model.transcribe(input_filename, language=language) output = result['text'] logger.info("Generated transcript output") elif output_type in ['srt', 'vtt']: - - result = model.transcribe(input_filename) srt_subtitles = [] for i, segment in enumerate(result['segments'], start=1): start = timedelta(seconds=segment['start']) @@ -70,13 +93,6 @@ def process_transcription(media_url, output_type, max_chars=56, language=None,): logger.info(f"Generated {output_type.upper()} output: {output}") elif output_type == 'ass': - result = model.transcribe( - input_filename, - word_timestamps=True, - task='transcribe', - verbose=False - ) - logger.info("Transcription completed with word-level timestamps") # Generate ASS subtitle content ass_content = generate_ass_subtitle(result, max_chars) logger.info("Generated ASS subtitle content") @@ -92,8 +108,6 @@ def process_transcription(media_url, output_type, max_chars=56, language=None,): else: raise ValueError("Invalid output type. Must be 'transcript', 'srt', or 'vtt'.") - os.remove(input_filename) - logger.info(f"Removed local file: {input_filename}") logger.info(f"Transcription successful, output type: {output_type}") return output except Exception as e: diff --git a/services/v1/media/media_transcribe.py b/services/v1/media/media_transcribe.py index 4514a9f2..4b349f56 100644 --- a/services/v1/media/media_transcribe.py +++ b/services/v1/media/media_transcribe.py @@ -22,38 +22,81 @@ from datetime import timedelta from whisper.utils import WriteSRT, WriteVTT from services.file_management import download_file +from services.runpod_whisper import transcribe_with_runpod import logging -from config import LOCAL_STORAGE_PATH +from config import LOCAL_STORAGE_PATH, USE_RUNPOD, RUNPOD_API_KEY # Set up logging logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO) -def process_transcribe_media(media_url, task, include_text, include_srt, include_segments, word_timestamps, response_type, language, job_id, words_per_line=None): +def process_transcribe_media(media_url, task, include_text, include_srt, include_segments, word_timestamps, response_type, language, job_id, words_per_line=None, initial_prompt=None): """Transcribe or translate media and return the transcript/translation, SRT or VTT file path.""" logger.info(f"Starting {task} for media URL: {media_url}") - input_filename = download_file(media_url, os.path.join(LOCAL_STORAGE_PATH, f"{job_id}_input")) - logger.info(f"Downloaded media to local file: {input_filename}") - + logger.info(f"Checking Runpod configuration: USE_RUNPOD={USE_RUNPOD}, RUNPOD_API_KEY={'✅ Set' if RUNPOD_API_KEY else '❌ Not set'}") + try: - # Load a larger model for better translation quality - #model_size = "large" if task == "translate" else "base" - model_size = "base" - model = whisper.load_model(model_size) - logger.info(f"Loaded Whisper {model_size} model") - - # Configure transcription/translation options - options = { - "task": task, - "word_timestamps": word_timestamps, - "verbose": False - } - - # Add language specification if provided - if language: - options["language"] = language - - result = model.transcribe(input_filename, **options) + # Choose transcription method based on configuration + if USE_RUNPOD and RUNPOD_API_KEY: + logger.info("Using Runpod API for transcription") + # For Runpod, we use the media URL directly + # Note: Runpod currently only supports transcription, not translation + if task == "translate": + logger.warning("Runpod doesn't support translation task. Falling back to local Whisper.") + # Fallback to local processing for translation + input_filename = download_file(media_url, os.path.join(LOCAL_STORAGE_PATH, f"{job_id}_input")) + logger.info(f"Downloaded media to local file: {input_filename}") + + model_size = "base" + model = whisper.load_model(model_size) + logger.info(f"Loaded Whisper {model_size} model for translation") + + options = { + "task": task, + "word_timestamps": word_timestamps, + "verbose": False, + "temperature": 0.0 # Default temperature for translation + } + if language: + options["language"] = language + + result = model.transcribe(input_filename, **options) + + # Clean up local file + os.remove(input_filename) + logger.info(f"Removed local file: {input_filename}") + else: + # Use Runpod for transcription + result = transcribe_with_runpod(media_url, model="large-v3", language=language, initial_prompt=initial_prompt) + logger.info("Runpod transcription completed") + else: + logger.info("Using local Whisper model") + # Download file for local processing + input_filename = download_file(media_url, os.path.join(LOCAL_STORAGE_PATH, f"{job_id}_input")) + logger.info(f"Downloaded media to local file: {input_filename}") + + # Load a larger model for better translation quality + model_size = "base" + model = whisper.load_model(model_size) + logger.info(f"Loaded Whisper {model_size} model") + + # Configure transcription/translation options + options = { + "task": task, + "word_timestamps": word_timestamps, + "verbose": False, + "temperature": 0.0 # Default temperature for transcription + } + + # Add language specification if provided + if language: + options["language"] = language + + result = model.transcribe(input_filename, **options) + + # Clean up local file + os.remove(input_filename) + logger.info(f"Removed local file: {input_filename}") # For translation task, the result['text'] will be in English text = None @@ -61,6 +104,7 @@ def process_transcribe_media(media_url, task, include_text, include_srt, include segments_json = None logger.info(f"Generated {task} output") + logger.debug(f"Transcription result object: {result}") if include_text is True: text = result['text'] @@ -121,8 +165,6 @@ def process_transcribe_media(media_url, task, include_text, include_srt, include if include_segments is True: segments_json = result['segments'] - os.remove(input_filename) - logger.info(f"Removed local file: {input_filename}") logger.info(f"{task.capitalize()} successful, output type: {response_type}") if response_type == "direct": From c04d34635d209acabf402797e5e542a8ac785d9d Mon Sep 17 00:00:00 2001 From: Vignesh Date: Wed, 20 Aug 2025 21:21:09 +0530 Subject: [PATCH 2/4] Remove libsvtav1-dev from Dockerfile dependencies --- Dockerfile | 1 - 1 file changed, 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index ae9529e7..89a0eb4f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -31,7 +31,6 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ libaom-dev \ libdav1d-dev \ librav1e-dev \ - libsvtav1-dev \ libzimg-dev \ libwebp-dev \ git \ From 8b71d8d9b0fa75b1d2bd32b75ea8ec79395fcbf7 Mon Sep 17 00:00:00 2001 From: Vignesh Date: Wed, 20 Aug 2025 23:54:36 +0530 Subject: [PATCH 3/4] Update environment configuration for S3 and RunPod integration in .env.example and add docker-compose-custom-build.yml --- .env.example | 17 ++++--- docker-compose-custom-build.yml | 82 +++++++++++++++++++++++++++++++++ 2 files changed, 93 insertions(+), 6 deletions(-) create mode 100644 docker-compose-custom-build.yml diff --git a/.env.example b/.env.example index 9344f25e..a60bf731 100644 --- a/.env.example +++ b/.env.example @@ -23,11 +23,11 @@ API_KEY=your_api_key_here # s3 Compatible Storage Env Vars # -#S3_ACCESS_KEY=your_access_key -#S3_SECRET_KEY=your_secret_key -#S3_ENDPOINT_URL=https://your-endpoint-url -#S3_REGION=your-region -#S3_BUCKET_NAME=your-bucket-name +S3_ACCESS_KEY=your_access_key +S3_SECRET_KEY=your_secret_key +S3_ENDPOINT_URL=https://your-endpoint-url +S3_REGION=your-region +S3_BUCKET_NAME=your-bucket-name # Google Cloud Storage Env Variables @@ -48,4 +48,9 @@ API_KEY=your_api_key_here # Requirement: Optional. #STORAGE_PATH=GCP -RUNPOD_API_KEY=your_runpod_api_key_here \ No newline at end of file +RUNPOD_API_KEY=your_runpod_api_key_here + +# RunPod and Whisper Configuration +USE_RUNPOD=true +WHISPER_MODEL=base +WHISPER_CACHE_DIR=/app/whisper_cache \ No newline at end of file diff --git a/docker-compose-custom-build.yml b/docker-compose-custom-build.yml new file mode 100644 index 00000000..d397c364 --- /dev/null +++ b/docker-compose-custom-build.yml @@ -0,0 +1,82 @@ +version: '3.8' + +services: + nca-toolkit: + build: + context: . + dockerfile: Dockerfile + hostname: 7f1388f0bda9 + user: appuser + environment: + - GPG_KEY=E3FF2839C048B25C084DEBE9B26995E310250568 + - API_KEY=${API_KEY} + - PYTHON_SHA256=8c136d199d3637a1fce98a16adc809c1d83c922d02d41f3614b34f8b6e7d38ec + - PYTHONUNBUFFERED=1 + - WHISPER_CACHE_DIR=/app/whisper_cache + - S3_SECRET_KEY=${S3_SECRET_KEY} + - LANG=C.UTF-8 + - PYTHON_VERSION=3.9.22 + - RUNPOD_API_KEY=${RUNPOD_API_KEY} + - S3_BUCKET_NAME=${S3_BUCKET_NAME} + - S3_ENDPOINT_URL=${S3_ENDPOINT_URL} + - S3_REGION=${S3_REGION} + - S3_ACCESS_KEY=${S3_ACCESS_KEY} + - USE_RUNPOD=true + - NVARCH=x86_64 + - NVIDIA_REQUIRE_CUDA=cuda>=12.8 brand=unknown,driver>=470,driver<471 brand=grid,driver>=470,driver<471 brand=tesla,driver>=470,driver<471 brand=nvidia,driver>=470,driver<471 brand=quadro,driver>=470,driver<471 brand=quadrortx,driver>=470,driver<471 brand=nvidiartx,driver>=470,driver<471 brand=vapps,driver>=470,driver<471 brand=vpc,driver>=470,driver<471 brand=vcs,driver>=470,driver<471 brand=vws,driver>=470,driver<471 brand=cloudgaming,driver>=470,driver<471 brand=unknown,driver>=535,driver<536 brand=grid,driver>=535,driver<536 brand=tesla,driver>=535,driver<536 brand=nvidia,driver>=535,driver<536 brand=quadro,driver>=535,driver<536 brand=quadrortx,driver>=535,driver<536 brand=nvidiartx,driver>=535,driver<536 brand=vapps,driver>=535,driver<536 brand=vpc,driver>=535,driver<536 brand=vcs,driver>=535,driver<536 brand=vws,driver>=535,driver<536 brand=cloudgaming,driver>=535,driver<536 brand=unknown,driver>=550,driver<551 brand=grid,driver>=550,driver<551 brand=tesla,driver>=550,driver<551 brand=nvidia,driver>=550,driver<551 brand=quadro,driver>=550,driver<551 brand=quadrortx,driver>=550,driver<551 brand=nvidiartx,driver>=550,driver<551 brand=vapps,driver>=550,driver<551 brand=vpc,driver>=550,driver<551 brand=vcs,driver>=550,driver<551 brand=vws,driver>=550,driver<551 brand=cloudgaming,driver>=550,driver<551 brand=unknown,driver>=560,driver<561 brand=grid,driver>=560,driver<561 brand=tesla,driver>=560,driver<561 brand=nvidia,driver>=560,driver<561 brand=quadro,driver>=560,driver<561 brand=quadrortx,driver>=560,driver<561 brand=nvidiartx,driver>=560,driver<561 brand=vapps,driver>=560,driver<561 brand=vpc,driver>=560,driver<561 brand=vcs,driver>=560,driver<561 brand=vws,driver>=560,driver<561 brand=cloudgaming,driver>=560,driver<561 brand=unknown,driver>=565,driver<566 brand=grid,driver>=565,driver<566 brand=tesla,driver>=565,driver<566 brand=nvidia,driver>=565,driver<566 brand=quadro,driver>=565,driver<566 brand=quadrortx,driver>=565,driver<566 brand=nvidiartx,driver>=565,driver<566 brand=vapps,driver>=565,driver<566 brand=vpc,driver>=565,driver<566 brand=vcs,driver>=565,driver<566 brand=vws,driver>=565,driver<566 brand=cloudgaming,driver>=565,driver<566 + - NV_CUDA_CUDART_VERSION=12.8.57-1 + - CUDA_VERSION=12.8.0 + - LD_LIBRARY_PATH=/usr/local/nvidia/lib:/usr/local/nvidia/lib64 + - NVIDIA_VISIBLE_DEVICES=all + - NVIDIA_DRIVER_CAPABILITIES=compute,utility + - NV_CUDA_LIB_VERSION=12.8.0-1 + - NV_NVTX_VERSION=12.8.55-1 + - NV_LIBNPP_VERSION=12.3.3.65-1 + - NV_LIBNPP_PACKAGE=libnpp-12-8=12.3.3.65-1 + - NV_LIBCUSPARSE_VERSION=12.5.7.53-1 + - NV_LIBCUBLAS_PACKAGE_NAME=libcublas-12-8 + - NV_LIBCUBLAS_VERSION=12.8.3.14-1 + - NV_LIBCUBLAS_PACKAGE=libcublas-12-8=12.8.3.14-1 + - NV_LIBNCCL_PACKAGE_NAME=libnccl2 + - NV_LIBNCCL_PACKAGE_VERSION=2.25.1-1 + - NCCL_VERSION=2.25.1-1 + - NV_LIBNCCL_PACKAGE=libnccl2=2.25.1-1+cuda12.8 + - NVIDIA_PRODUCT_NAME=CUDA + - NV_CUDA_CUDART_DEV_VERSION=12.8.57-1 + - NV_NVML_DEV_VERSION=12.8.55-1 + - NV_LIBCUSPARSE_DEV_VERSION=12.5.7.53-1 + - NV_LIBNPP_DEV_VERSION=12.3.3.65-1 + - NV_LIBNPP_DEV_PACKAGE=libnpp-dev-12-8=12.3.3.65-1 + - NV_LIBCUBLAS_DEV_VERSION=12.8.3.14-1 + - NV_LIBCUBLAS_DEV_PACKAGE_NAME=libcublas-dev-12-8 + - NV_LIBCUBLAS_DEV_PACKAGE=libcublas-dev-12-8=12.8.3.14-1 + - NV_CUDA_NSIGHT_COMPUTE_VERSION=12.8.0-1 + - NV_CUDA_NSIGHT_COMPUTE_DEV_PACKAGE=cuda-nsight-compute-12-8=12.8.0-1 + - NV_NVPROF_VERSION=12.8.57-1 + - NV_NVPROF_DEV_PACKAGE=cuda-nvprof-12-8=12.8.57-1 + - NV_LIBNCCL_DEV_PACKAGE_NAME=libnccl-dev + - NV_LIBNCCL_DEV_PACKAGE_VERSION=2.25.1-1 + - NV_LIBNCCL_DEV_PACKAGE=libnccl-dev=2.25.1-1+cuda12.8 + - LIBRARY_PATH=/usr/local/cuda/lib64/stubs + - NV_CUDNN_VERSION=9.7.0.66-1 + - NV_CUDNN_PACKAGE_NAME=libcudnn9-cuda-12 + - NV_CUDNN_PACKAGE=libcudnn9-cuda-12=9.7.0.66-1 + - NV_CUDNN_PACKAGE_DEV=libcudnn9-dev-cuda-12=9.7.0.66-1 + - WHISPER_MODEL=base + working_dir: /app + ports: + - "8080:8080" + restart: unless-stopped + labels: + - "com.nvidia.cudnn.version=9.7.0.66-1" + - "maintainer=NVIDIA CORPORATION " + - "org.opencontainers.image.ref.name=ubuntu" + - "org.opencontainers.image.version=24.04" + runtime: nvidia + deploy: + resources: + reservations: + devices: + - driver: nvidia + count: all + capabilities: [gpu] From abe5230d6795d1501f44543a25b470eb85298799 Mon Sep 17 00:00:00 2001 From: Vignesh Date: Thu, 21 Aug 2025 01:02:37 +0530 Subject: [PATCH 4/4] Remove unused Whisper model configuration from environment files --- .env.example | 1 - docker-compose-custom-build.yml | 60 --------------------------------- 2 files changed, 61 deletions(-) diff --git a/.env.example b/.env.example index a60bf731..1cc24cd4 100644 --- a/.env.example +++ b/.env.example @@ -52,5 +52,4 @@ RUNPOD_API_KEY=your_runpod_api_key_here # RunPod and Whisper Configuration USE_RUNPOD=true -WHISPER_MODEL=base WHISPER_CACHE_DIR=/app/whisper_cache \ No newline at end of file diff --git a/docker-compose-custom-build.yml b/docker-compose-custom-build.yml index d397c364..d3f56252 100644 --- a/docker-compose-custom-build.yml +++ b/docker-compose-custom-build.yml @@ -5,78 +5,18 @@ services: build: context: . dockerfile: Dockerfile - hostname: 7f1388f0bda9 - user: appuser environment: - - GPG_KEY=E3FF2839C048B25C084DEBE9B26995E310250568 - API_KEY=${API_KEY} - - PYTHON_SHA256=8c136d199d3637a1fce98a16adc809c1d83c922d02d41f3614b34f8b6e7d38ec - PYTHONUNBUFFERED=1 - WHISPER_CACHE_DIR=/app/whisper_cache - S3_SECRET_KEY=${S3_SECRET_KEY} - - LANG=C.UTF-8 - - PYTHON_VERSION=3.9.22 - RUNPOD_API_KEY=${RUNPOD_API_KEY} - S3_BUCKET_NAME=${S3_BUCKET_NAME} - S3_ENDPOINT_URL=${S3_ENDPOINT_URL} - S3_REGION=${S3_REGION} - S3_ACCESS_KEY=${S3_ACCESS_KEY} - USE_RUNPOD=true - - NVARCH=x86_64 - - NVIDIA_REQUIRE_CUDA=cuda>=12.8 brand=unknown,driver>=470,driver<471 brand=grid,driver>=470,driver<471 brand=tesla,driver>=470,driver<471 brand=nvidia,driver>=470,driver<471 brand=quadro,driver>=470,driver<471 brand=quadrortx,driver>=470,driver<471 brand=nvidiartx,driver>=470,driver<471 brand=vapps,driver>=470,driver<471 brand=vpc,driver>=470,driver<471 brand=vcs,driver>=470,driver<471 brand=vws,driver>=470,driver<471 brand=cloudgaming,driver>=470,driver<471 brand=unknown,driver>=535,driver<536 brand=grid,driver>=535,driver<536 brand=tesla,driver>=535,driver<536 brand=nvidia,driver>=535,driver<536 brand=quadro,driver>=535,driver<536 brand=quadrortx,driver>=535,driver<536 brand=nvidiartx,driver>=535,driver<536 brand=vapps,driver>=535,driver<536 brand=vpc,driver>=535,driver<536 brand=vcs,driver>=535,driver<536 brand=vws,driver>=535,driver<536 brand=cloudgaming,driver>=535,driver<536 brand=unknown,driver>=550,driver<551 brand=grid,driver>=550,driver<551 brand=tesla,driver>=550,driver<551 brand=nvidia,driver>=550,driver<551 brand=quadro,driver>=550,driver<551 brand=quadrortx,driver>=550,driver<551 brand=nvidiartx,driver>=550,driver<551 brand=vapps,driver>=550,driver<551 brand=vpc,driver>=550,driver<551 brand=vcs,driver>=550,driver<551 brand=vws,driver>=550,driver<551 brand=cloudgaming,driver>=550,driver<551 brand=unknown,driver>=560,driver<561 brand=grid,driver>=560,driver<561 brand=tesla,driver>=560,driver<561 brand=nvidia,driver>=560,driver<561 brand=quadro,driver>=560,driver<561 brand=quadrortx,driver>=560,driver<561 brand=nvidiartx,driver>=560,driver<561 brand=vapps,driver>=560,driver<561 brand=vpc,driver>=560,driver<561 brand=vcs,driver>=560,driver<561 brand=vws,driver>=560,driver<561 brand=cloudgaming,driver>=560,driver<561 brand=unknown,driver>=565,driver<566 brand=grid,driver>=565,driver<566 brand=tesla,driver>=565,driver<566 brand=nvidia,driver>=565,driver<566 brand=quadro,driver>=565,driver<566 brand=quadrortx,driver>=565,driver<566 brand=nvidiartx,driver>=565,driver<566 brand=vapps,driver>=565,driver<566 brand=vpc,driver>=565,driver<566 brand=vcs,driver>=565,driver<566 brand=vws,driver>=565,driver<566 brand=cloudgaming,driver>=565,driver<566 - - NV_CUDA_CUDART_VERSION=12.8.57-1 - - CUDA_VERSION=12.8.0 - - LD_LIBRARY_PATH=/usr/local/nvidia/lib:/usr/local/nvidia/lib64 - - NVIDIA_VISIBLE_DEVICES=all - - NVIDIA_DRIVER_CAPABILITIES=compute,utility - - NV_CUDA_LIB_VERSION=12.8.0-1 - - NV_NVTX_VERSION=12.8.55-1 - - NV_LIBNPP_VERSION=12.3.3.65-1 - - NV_LIBNPP_PACKAGE=libnpp-12-8=12.3.3.65-1 - - NV_LIBCUSPARSE_VERSION=12.5.7.53-1 - - NV_LIBCUBLAS_PACKAGE_NAME=libcublas-12-8 - - NV_LIBCUBLAS_VERSION=12.8.3.14-1 - - NV_LIBCUBLAS_PACKAGE=libcublas-12-8=12.8.3.14-1 - - NV_LIBNCCL_PACKAGE_NAME=libnccl2 - - NV_LIBNCCL_PACKAGE_VERSION=2.25.1-1 - - NCCL_VERSION=2.25.1-1 - - NV_LIBNCCL_PACKAGE=libnccl2=2.25.1-1+cuda12.8 - - NVIDIA_PRODUCT_NAME=CUDA - - NV_CUDA_CUDART_DEV_VERSION=12.8.57-1 - - NV_NVML_DEV_VERSION=12.8.55-1 - - NV_LIBCUSPARSE_DEV_VERSION=12.5.7.53-1 - - NV_LIBNPP_DEV_VERSION=12.3.3.65-1 - - NV_LIBNPP_DEV_PACKAGE=libnpp-dev-12-8=12.3.3.65-1 - - NV_LIBCUBLAS_DEV_VERSION=12.8.3.14-1 - - NV_LIBCUBLAS_DEV_PACKAGE_NAME=libcublas-dev-12-8 - - NV_LIBCUBLAS_DEV_PACKAGE=libcublas-dev-12-8=12.8.3.14-1 - - NV_CUDA_NSIGHT_COMPUTE_VERSION=12.8.0-1 - - NV_CUDA_NSIGHT_COMPUTE_DEV_PACKAGE=cuda-nsight-compute-12-8=12.8.0-1 - - NV_NVPROF_VERSION=12.8.57-1 - - NV_NVPROF_DEV_PACKAGE=cuda-nvprof-12-8=12.8.57-1 - - NV_LIBNCCL_DEV_PACKAGE_NAME=libnccl-dev - - NV_LIBNCCL_DEV_PACKAGE_VERSION=2.25.1-1 - - NV_LIBNCCL_DEV_PACKAGE=libnccl-dev=2.25.1-1+cuda12.8 - - LIBRARY_PATH=/usr/local/cuda/lib64/stubs - - NV_CUDNN_VERSION=9.7.0.66-1 - - NV_CUDNN_PACKAGE_NAME=libcudnn9-cuda-12 - - NV_CUDNN_PACKAGE=libcudnn9-cuda-12=9.7.0.66-1 - - NV_CUDNN_PACKAGE_DEV=libcudnn9-dev-cuda-12=9.7.0.66-1 - - WHISPER_MODEL=base working_dir: /app ports: - "8080:8080" restart: unless-stopped - labels: - - "com.nvidia.cudnn.version=9.7.0.66-1" - - "maintainer=NVIDIA CORPORATION " - - "org.opencontainers.image.ref.name=ubuntu" - - "org.opencontainers.image.version=24.04" - runtime: nvidia - deploy: - resources: - reservations: - devices: - - driver: nvidia - count: all - capabilities: [gpu]