-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
0 parents
commit 85d4482
Showing
8 changed files
with
614 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
# Copyright Jiaqi Liu | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
--- | ||
name: CI/CD | ||
|
||
on: | ||
pull_request: | ||
push: | ||
branches: [master] | ||
|
||
jobs: | ||
sync-to-huggingface-space: | ||
runs-on: ubuntu-latest | ||
steps: | ||
- uses: actions/checkout@v3 | ||
with: | ||
fetch-depth: 0 | ||
lfs: true | ||
- name: Push to hub | ||
run: git push https://QubitPi:$HF_TOKEN@huggingface.co/spaces/QubitPi/tiger master:main -f | ||
env: | ||
HF_TOKEN: ${{ secrets.HF_TOKEN }} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
.venv | ||
.idea/ | ||
.DS_Store | ||
__pycache__ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,258 @@ | ||
import whisper | ||
from pytubefix import YouTube | ||
from pytubefix.cli import on_progress | ||
import requests | ||
import time | ||
import streamlit as st | ||
from streamlit_lottie import st_lottie | ||
import numpy as np | ||
import os | ||
from typing import Iterator | ||
from io import StringIO | ||
from utils import write_vtt, write_srt | ||
import ffmpeg | ||
from languages import LANGUAGES | ||
import torch | ||
from zipfile import ZipFile | ||
from io import BytesIO | ||
import base64 | ||
import pathlib | ||
import re | ||
|
||
st.set_page_config(page_title="Auto Subtitled Video Generator", page_icon=":movie_camera:", layout="wide") | ||
|
||
torch.cuda.is_available() | ||
DEVICE = "cuda" if torch.cuda.is_available() else "cpu" | ||
# Model options: tiny, base, small, medium, large | ||
loaded_model = whisper.load_model("small", device=DEVICE) | ||
current_size = "None" | ||
|
||
|
||
|
||
# Define a function that we can use to load lottie files from a link. | ||
def load_lottieurl(url: str): | ||
r = requests.get(url) | ||
if r.status_code != 200: | ||
return None | ||
return r.json() | ||
|
||
APP_DIR = pathlib.Path(__file__).parent.absolute() | ||
|
||
LOCAL_DIR = APP_DIR / "local_youtube" | ||
LOCAL_DIR.mkdir(exist_ok=True) | ||
save_dir = LOCAL_DIR / "output" | ||
save_dir.mkdir(exist_ok=True) | ||
|
||
|
||
|
||
col1, col2 = st.columns([1, 3]) | ||
with col1: | ||
lottie = load_lottieurl("https://assets8.lottiefiles.com/packages/lf20_jh9gfdye.json") | ||
st_lottie(lottie) | ||
|
||
with col2: | ||
st.write(""" | ||
## Auto Subtitled Video Generator | ||
##### Input a YouTube video link and get a video with subtitles. | ||
###### ➠ If you want to transcribe the video in its original language, select the task as "Transcribe" | ||
###### ➠ If you want to translate the subtitles to English, select the task as "Translate" | ||
###### I recommend starting with the base model and then experimenting with the larger models, the small and medium models often work well. """) | ||
|
||
|
||
def download_video(link): | ||
yt = YouTube(link, on_progress_callback=on_progress) | ||
ys = yt.streams.get_highest_resolution() | ||
video = ys.download(filename=f"{save_dir}/youtube_video.mp4") | ||
return video | ||
|
||
|
||
def convert(seconds): | ||
return time.strftime("%H:%M:%S", time.gmtime(seconds)) | ||
|
||
|
||
def change_model(current_size, size): | ||
if current_size != size: | ||
loaded_model = whisper.load_model(size) | ||
return loaded_model | ||
else: | ||
raise Exception("Model size is the same as the current size.") | ||
|
||
|
||
def inference(link, loaded_model, task): | ||
yt = YouTube(link, on_progress_callback=on_progress) | ||
ys = yt.streams.get_audio_only() | ||
path = ys.download(filename=f"{save_dir}/audio.mp3", mp3=True) | ||
if task == "Transcribe": | ||
options = dict(task="transcribe", best_of=5) | ||
results = loaded_model.transcribe(path, **options) | ||
vtt = getSubs(results["segments"], "vtt", 80) | ||
srt = getSubs(results["segments"], "srt", 80) | ||
lang = results["language"] | ||
return results["text"], vtt, srt, lang | ||
elif task == "Translate": | ||
options = dict(task="translate", best_of=5) | ||
results = loaded_model.transcribe(path, **options) | ||
vtt = getSubs(results["segments"], "vtt", 80) | ||
srt = getSubs(results["segments"], "srt", 80) | ||
lang = results["language"] | ||
return results["text"], vtt, srt, lang | ||
else: | ||
raise ValueError("Task not supported") | ||
|
||
|
||
def getSubs(segments: Iterator[dict], format: str, maxLineWidth: int) -> str: | ||
segmentStream = StringIO() | ||
|
||
if format == 'vtt': | ||
write_vtt(segments, file=segmentStream, maxLineWidth=maxLineWidth) | ||
elif format == 'srt': | ||
write_srt(segments, file=segmentStream, maxLineWidth=maxLineWidth) | ||
else: | ||
raise Exception("Unknown format " + format) | ||
|
||
segmentStream.seek(0) | ||
return segmentStream.read() | ||
|
||
|
||
def get_language_code(language): | ||
if language in LANGUAGES.keys(): | ||
detected_language = LANGUAGES[language] | ||
return detected_language | ||
else: | ||
raise ValueError("Language not supported") | ||
|
||
|
||
def generate_subtitled_video(video, audio, transcript): | ||
video_file = ffmpeg.input(video) | ||
audio_file = ffmpeg.input(audio) | ||
ffmpeg.concat(video_file.filter("subtitles", transcript), audio_file, v=1, a=1).output("youtube_sub.mp4").run(quiet=True, overwrite_output=True) | ||
video_with_subs = open("youtube_sub.mp4", "rb") | ||
return video_with_subs | ||
|
||
|
||
def main(): | ||
size = st.selectbox("Select Model Size (The larger the model, the more accurate the transcription will be, but it will take longer)", ["tiny", "base", "small", "medium", "large-v3"], index=1) | ||
loaded_model = change_model(current_size, size) | ||
st.write(f"Model is {'multilingual' if loaded_model.is_multilingual else 'English-only'} " | ||
f"and has {sum(np.prod(p.shape) for p in loaded_model.parameters()):,} parameters.") | ||
link = st.text_input("YouTube Link (The longer the video, the longer the processing time)", placeholder="Input YouTube link and press enter") | ||
task = st.selectbox("Select Task", ["Transcribe", "Translate"], index=0) | ||
if task == "Transcribe": | ||
if st.button("Transcribe"): | ||
with st.spinner("Transcribing the video..."): | ||
results = inference(link, loaded_model, task) | ||
video = download_video(link) | ||
lang = results[3] | ||
detected_language = get_language_code(lang) | ||
|
||
col3, col4 = st.columns(2) | ||
with col3: | ||
st.video(video) | ||
|
||
# Split result["text"] on !,? and . , but save the punctuation | ||
sentences = re.split("([!?.])", results[0]) | ||
# Join the punctuation back to the sentences | ||
sentences = ["".join(i) for i in zip(sentences[0::2], sentences[1::2])] | ||
text = "\n\n".join(sentences) | ||
with open("transcript.txt", "w+", encoding='utf8') as f: | ||
f.writelines(text) | ||
f.close() | ||
with open(os.path.join(os.getcwd(), "transcript.txt"), "rb") as f: | ||
datatxt = f.read() | ||
|
||
with open("transcript.vtt", "w+",encoding='utf8') as f: | ||
f.writelines(results[1]) | ||
f.close() | ||
with open(os.path.join(os.getcwd(), "transcript.vtt"), "rb") as f: | ||
datavtt = f.read() | ||
|
||
with open("transcript.srt", "w+",encoding='utf8') as f: | ||
f.writelines(results[2]) | ||
f.close() | ||
with open(os.path.join(os.getcwd(), "transcript.srt"), "rb") as f: | ||
datasrt = f.read() | ||
|
||
with col4: | ||
with st.spinner("Generating Subtitled Video"): | ||
video_with_subs = generate_subtitled_video(video, f"{save_dir}/audio.mp3", "transcript.srt") | ||
st.video(video_with_subs) | ||
st.balloons() | ||
|
||
zipObj = ZipFile("YouTube_transcripts_and_video.zip", "w") | ||
zipObj.write("transcript.txt") | ||
zipObj.write("transcript.vtt") | ||
zipObj.write("transcript.srt") | ||
zipObj.write("youtube_sub.mp4") | ||
zipObj.close() | ||
ZipfileDotZip = "YouTube_transcripts_and_video.zip" | ||
with open(ZipfileDotZip, "rb") as f: | ||
datazip = f.read() | ||
b64 = base64.b64encode(datazip).decode() | ||
href = f"<a href=\"data:file/zip;base64,{b64}\" download='{ZipfileDotZip}'>\ | ||
Download Transcripts and Video\ | ||
</a>" | ||
st.markdown(href, unsafe_allow_html=True) | ||
|
||
elif task == "Translate": | ||
if st.button("Translate to English"): | ||
with st.spinner("Translating to English..."): | ||
results = inference(link, loaded_model, task) | ||
video = download_video(link) | ||
lang = results[3] | ||
detected_language = get_language_code(lang) | ||
|
||
col3, col4 = st.columns(2) | ||
with col3: | ||
st.video(video) | ||
|
||
# Split result["text"] on !,? and . , but save the punctuation | ||
sentences = re.split("([!?.])", results[0]) | ||
# Join the punctuation back to the sentences | ||
sentences = ["".join(i) for i in zip(sentences[0::2], sentences[1::2])] | ||
text = "\n\n".join(sentences) | ||
with open("transcript.txt", "w+", encoding='utf8') as f: | ||
f.writelines(text) | ||
f.close() | ||
with open(os.path.join(os.getcwd(), "transcript.txt"), "rb") as f: | ||
datatxt = f.read() | ||
|
||
with open("transcript.vtt", "w+",encoding='utf8') as f: | ||
f.writelines(results[1]) | ||
f.close() | ||
with open(os.path.join(os.getcwd(), "transcript.vtt"), "rb") as f: | ||
datavtt = f.read() | ||
|
||
with open("transcript.srt", "w+",encoding='utf8') as f: | ||
f.writelines(results[2]) | ||
f.close() | ||
with open(os.path.join(os.getcwd(), "transcript.srt"), "rb") as f: | ||
datasrt = f.read() | ||
|
||
with col4: | ||
with st.spinner("Generating Subtitled Video"): | ||
video_with_subs = generate_subtitled_video(video, f"{save_dir}/audio.mp3", "transcript.srt") | ||
st.video(video_with_subs) | ||
st.balloons() | ||
|
||
zipObj = ZipFile("YouTube_transcripts_and_video.zip", "w") | ||
zipObj.write("transcript.txt") | ||
zipObj.write("transcript.vtt") | ||
zipObj.write("transcript.srt") | ||
zipObj.write("youtube_sub.mp4") | ||
zipObj.close() | ||
ZipfileDotZip = "YouTube_transcripts_and_video.zip" | ||
with open(ZipfileDotZip, "rb") as f: | ||
datazip = f.read() | ||
b64 = base64.b64encode(datazip).decode() | ||
href = f"<a href=\"data:file/zip;base64,{b64}\" download='{ZipfileDotZip}'>\ | ||
Download Transcripts and Video\ | ||
</a>" | ||
st.markdown(href, unsafe_allow_html=True) | ||
|
||
else: | ||
st.info("Please select a task.") | ||
|
||
|
||
if __name__ == "__main__": | ||
main() | ||
|
Oops, something went wrong.