Files
rocm-whisper-webui/ui/app.py
2026-02-25 23:48:01 +03:00

495 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Gradio Web UI для Whisper транскрибации
- Загрузка видео/аудио файлов
- Обработка FFmpeg (извлечение аудио, изменение скорости)
- Вызов API транскрибации
- Сохранение результатов в SQLite с автоудалением через 6 часов
"""
import os
import sqlite3
import uuid
import tempfile
import subprocess
import requests
import gradio as gr
from datetime import datetime, timedelta
import json
# Конфигурация из переменных окружения
WHISPER_API_URL = os.getenv("WHISPER_API_URL", "http://whisper-api:8080/transcribe")
DB_RETENTION_HOURS = int(os.getenv("DB_RETENTION_HOURS", "6"))
DB_PATH = "/app/db/jobs.db"
# Поддерживаемые языки
LANGUAGES = [
"Автоопределение", "Английский", "Русский", "Испанский", "Французский",
"Немецкий", "Итальянский", "Португальский", "Польский", "Турецкий",
"Нидерландский", "Японский", "Корейский", "Китайский"
]
LANGUAGE_CODES = {
"Автоопределение": None,
"Английский": "en",
"Русский": "ru",
"Испанский": "es",
"Французский": "fr",
"Немецкий": "de",
"Итальянский": "it",
"Португальский": "pt",
"Польский": "pl",
"Турецкий": "tr",
"Нидерландский": "nl",
"Японский": "ja",
"Корейский": "ko",
"Китайский": "zh"
}
def init_database():
"""Инициализация SQLite базы данных"""
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS jobs (
id TEXT PRIMARY KEY,
created_at DATETIME NOT NULL,
filename TEXT NOT NULL,
task_type TEXT NOT NULL,
language TEXT,
speed REAL NOT NULL,
transcript_text TEXT NOT NULL,
api_url TEXT NOT NULL
)
''')
conn.commit()
conn.close()
# Удаление старых записей при запуске
cleanup_old_jobs()
def cleanup_old_jobs():
"""Удаление записей старше DB_RETENTION_HOURS часов"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cutoff_time = datetime.now() - timedelta(hours=DB_RETENTION_HOURS)
cursor.execute(
"DELETE FROM jobs WHERE created_at < ?",
(cutoff_time.strftime("%Y-%m-%d %H:%M:%S"),)
)
deleted = cursor.rowcount
conn.commit()
conn.close()
if deleted > 0:
print(f"Удалено {deleted} старых записей из базы данных")
def save_job(job_id: str, filename: str, task_type: str, language: str,
speed: float, transcript_text: str):
"""Сохранение задачи в базу данных"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO jobs (id, created_at, filename, task_type, language, speed, transcript_text, api_url)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
job_id,
datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
filename,
task_type,
language,
speed,
transcript_text,
WHISPER_API_URL
))
conn.commit()
conn.close()
def get_history():
"""Получение истории задач"""
cleanup_old_jobs() # Очистка при каждом запросе истории
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
SELECT id, created_at, filename, task_type, language, speed,
substr(transcript_text, 1, 50) as preview
FROM jobs
ORDER BY created_at DESC
LIMIT 50
''')
rows = cursor.fetchall()
conn.close()
if not rows:
return None
# Форматирование для DataFrame
data = []
for row in rows:
data.append({
"ID": row[0][:8],
"Дата": row[1],
"Файл": row[2],
"Задача": row[3],
"Язык": row[4] or "Auto",
"Скорость": f"{row[5]}x",
"Текст": row[6] + "..." if len(row[6]) >= 50 else row[6]
})
return data
def check_api_health():
"""Проверка доступности API"""
try:
api_base = WHISPER_API_URL.split("/transcribe")[0]
response = requests.get(f"{api_base}/health", timeout=5)
if response.status_code == 200:
return True, "Подключено"
except Exception as e:
return False, f"Ошибка: {str(e)}"
return False, "Недоступно"
def process_audio(file_obj, speed, task_type, language, progress=gr.Progress()):
"""
Обработка аудио/видео файла:
1. Извлечение аудио из видео (если нужно)
2. Изменение скорости аудио
3. Отправка на API транскрибации
4. Сохранение результата в БД
"""
if file_obj is None:
raise gr.Error("Пожалуйста, загрузите файл")
progress(0, desc="Инициализация...")
# Генерация уникального ID задачи
job_id = str(uuid.uuid4())
# Определение расширения файла
original_filename = os.path.basename(file_obj.name)
file_ext = os.path.splitext(original_filename)[1].lower()
# Создание временных файлов
with tempfile.TemporaryDirectory() as temp_dir:
input_path = os.path.join(temp_dir, f"input{file_ext}")
output_path = os.path.join(temp_dir, "processed.mp3")
# Копирование загруженного файла
with open(input_path, "wb") as f:
with open(file_obj.name, "rb") as original:
f.write(original.read())
progress(10, desc="Обработка аудио (FFmpeg)...")
# Определение кодека для извлечения аудио
audio_ext = file_ext.lstrip('.')
video_exts = ['.mp4', '.mkv', '.avi', '.mov', '.webm', '.flv']
# Если это видео - извлекаем аудио
if file_ext in video_exts:
# Извлечение аудио из видео
cmd_extract = [
"ffmpeg", "-y", "-i", input_path,
"-vn", "-acodec", "libmp3lame", "-q:a", "2",
output_path
]
subprocess.run(cmd_extract, capture_output=True, check=True)
input_path = output_path
# Применение изменения скорости (atempo)
# FFmpeg atempo поддерживает диапазон 0.5-2.0
speed = float(speed)
if speed != 1.0:
# Если скорость вне диапазона atempo, используем несколько проходов
atempo_values = []
remaining_speed = speed
while remaining_speed > 2.0:
atempo_values.append("2.0")
remaining_speed /= 2.0
while remaining_speed < 0.5:
atempo_values.append("0.5")
remaining_speed /= 0.5
atempo_values.append(str(round(remaining_speed, 2)))
# Применяем atempo фильтр
temp_output = os.path.join(temp_dir, "sped.mp3")
cmd_speed = [
"ffmpeg", "-y", "-i", input_path,
"-filter:a", f"atempo={','.join(atempo_values)}",
"-acodec", "libmp3lame", "-q:a", "2",
temp_output
]
subprocess.run(cmd_speed, capture_output=True, check=True)
output_path = temp_output
else:
# Просто конвертируем в mp3 если это аудио
if file_ext not in video_exts:
cmd_convert = [
"ffmpeg", "-y", "-i", input_path,
"-acodec", "libmp3lame", "-q:a", "2",
output_path
]
subprocess.run(cmd_convert, capture_output=True, check=True)
progress(50, desc="Отправка на сервер транскрибации...")
# Чтение обработанного аудио
with open(output_path, "rb") as audio_file:
audio_data = audio_file.read()
# Подготовка данных для API
files = {
"file": ("audio.mp3", audio_data, "audio/mpeg")
}
data = {
"task": task_type,
}
# Добавляем язык если выбран
if language and language != "Автоопределение":
data["language"] = LANGUAGE_CODES.get(language)
progress(70, desc="Транскрибация...")
# Отправка запроса к API
try:
response = requests.post(
WHISPER_API_URL,
files=files,
data=data,
timeout=600 # 10 минут таймаут
)
if response.status_code != 200:
error_msg = response.text
raise gr.Error(f"Ошибка API: {response.status_code} - {error_msg}")
result = response.json()
except requests.exceptions.ConnectionError:
raise gr.Error(f"Не удалось подключиться к API: {WHISPER_API_URL}")
except requests.exceptions.Timeout:
raise gr.Error("Превышен таймаут ожидания ответа от API")
progress(90, desc="Сохранение результатов...")
# Сохранение в базу данных
transcript_text = result.get("text", "")
detected_language = result.get("language", language)
save_job(
job_id=job_id,
filename=original_filename,
task_type=task_type,
language=detected_language,
speed=speed,
transcript_text=transcript_text
)
progress(100, desc="Готово!")
# Возвращаем результаты
# Читаем аудио для проигрывания
with open(output_path, "rb") as f:
audio_bytes = f.read()
return (
output_path, # Audio для проигрывания
transcript_text, # Текст транскрибации
json.dumps(result.get("segments", []), ensure_ascii=False, indent=2), # Сегменты
f"Готово! Язык: {detected_language}, Задача: {task_type}" # Статус
)
def create_ui():
"""Создание Gradio интерфейса"""
# Инициализация БД
init_database()
# Проверка статуса API
api_ok, api_status = check_api_health()
with gr.Blocks(
title="Whisper Speed Transcribe",
theme=gr.themes.Base(
primary_hue="orange",
secondary_hue="gray",
),
css="""
.main-header {
text-align: center;
padding: 20px;
background: linear-gradient(90deg, #1a1a2e 0%, #16213e 100%);
border-radius: 10px;
margin-bottom: 20px;
}
.status-connected {
color: #10b981;
font-weight: bold;
}
.status-disconnected {
color: #ef4444;
font-weight: bold;
}
"""
) as app:
# Заголовок
gr.HTML("""
<div class="main-header">
<h1>🎙️ Whisper Speed Transcribe</h1>
<p>Транскрибация и перевод аудио/видео с изменением скорости</p>
</div>
""")
# Статус API
with gr.Row():
with gr.Column(scale=3):
gr.Markdown(f"**Статус API:** {'🟢 Подключено' if api_ok else '🔴 Не подключено'}")
with gr.Column(scale=1):
gr.Markdown(f"**URL API:** `{WHISPER_API_URL}`")
gr.HTML("<hr>")
# Основной интерфейс
with gr.Row():
# Левая колонка - настройки
with gr.Column(scale=1):
gr.Markdown("### 📁 Загрузка файла")
file_input = gr.File(
label="Видео или аудио файл",
file_count="single",
file_types=[".mp4", ".mkv", ".avi", ".mov", ".webm", ".mp3", ".wav", ".m4a", ".flac", ".ogg"]
)
gr.Markdown("### ⚙️ Настройки обработки")
with gr.Accordion("Настройки аудио", open=True):
speed_slider = gr.Slider(
label="Скорость воспроизведения",
minimum=0.5,
maximum=2.0,
step=0.05,
value=1.25,
info="Ускоряет аудио без изменения высоты тона"
)
with gr.Accordion("Настройки Whisper", open=True):
task_radio = gr.Radio(
label="Задача",
choices=["transcribe", "translate"],
value="transcribe",
info="transcribe - расшифровка, translate - перевод на английский"
)
language_dropdown = gr.Dropdown(
label="Язык (опционально)",
choices=LANGUAGES,
value="Автоопределение",
info="Если не выбрано - определяется автоматически"
)
process_btn = gr.Button(
"🚀 Обработать файл",
variant="primary",
size="lg"
)
status_output = gr.Textbox(
label="Статус",
interactive=False
)
# Правая колонка - результаты
with gr.Column(scale=1):
gr.Markdown("### 🔊 Обработанное аудио")
audio_output = gr.Audio(
label="Аудио после обработки",
interactive=False
)
gr.Markdown("### 📝 Результат транскрибации")
text_output = gr.Textbox(
label="Текст",
lines=10,
interactive=False
)
with gr.Accordion("Сегменты (JSON)", open=False):
segments_output = gr.JSON(
label="Детальные сегменты"
)
gr.HTML("<hr>")
# История задач
gr.Markdown("### 📋 История задач")
with gr.Row():
refresh_btn = gr.Button("🔄 Обновить историю")
history_table = gr.Dataframe(
headers=["ID", "Дата", "Файл", "Задача", "Язык", "Скорость", "Текст"],
datatype=["str", "str", "str", "str", "str", "str", "str"],
label="Последние задачи",
max_height=300
)
def load_history():
data = get_history()
if data:
return data
return []
# Загрузка истории при запуске
history_table.value = load_history()
# Обработчики событий
process_btn.click(
fn=process_audio,
inputs=[file_input, speed_slider, task_radio, language_dropdown],
outputs=[audio_output, text_output, segments_output, status_output]
)
refresh_btn.click(
fn=load_history,
outputs=[history_table]
)
# Автообновление истории каждые 30 секунд
app.load(fn=load_history, outputs=[history_table])
return app
if __name__ == "__main__":
print(f"Запуск Gradio UI...")
print(f"API URL: {WHISPER_API_URL}")
print(f"Хранение задач: {DB_RETENTION_HOURS} часов")
app = create_ui()
app.launch(
server_name="0.0.0.0",
server_port=7860,
share=False
)