2026-05-19-00-11-40 新增Ubuntu配音工作流

This commit is contained in:
2026-05-19 00:22:10 +08:00
parent 6f63ae714c
commit ee8a28da78
12 changed files with 1034 additions and 0 deletions

View File

@@ -0,0 +1,38 @@
# Tools_scripts_XunFei-Ubuntu
Ubuntu 版配音工具,使用 Bash + Python + ffmpeg 替代 PowerShell。
## Install
```bash
sudo apt update
sudo apt install -y python3 python3-pip ffmpeg
python3 -m pip install -r Tools_scripts_XunFei-Ubuntu/requirements-ubuntu.txt
```
## Environment
```bash
export XF_APPID="your_app_id"
export XF_APIKEY="your_api_key"
export XF_APISECRET="your_api_secret"
```
## Generate Voice
```bash
./Tools_scripts_XunFei-Ubuntu/synthesize_xfyun_super_tts.sh \
--script 配音稿.md \
--output-dir 02_audio/super_tts \
--voice x5_lingfeiyi_flow \
--speed 50
```
## Build Final Video
```bash
python3 Tools_scripts_XunFei-Ubuntu/build_final_video_ubuntu.py \
--video input.mp4 \
--audio-dir 02_audio/super_tts \
--output 05_outputs/final_voiceover.mp4
```

View File

@@ -0,0 +1,232 @@
#!/usr/bin/env python3
"""Build a final voice-over video on Ubuntu with ffmpeg."""
from __future__ import annotations
import argparse
import shutil
import subprocess
from pathlib import Path
AUDIO_EXTS = {".mp3", ".wav", ".m4a", ".aac", ".flac", ".ogg"}
def run(cmd: list[str]) -> None:
print("+ " + " ".join(cmd))
subprocess.run(cmd, check=True)
def require_tool(name: str) -> str:
path = shutil.which(name)
if not path:
raise SystemExit(f"{name} is required. Install it with: sudo apt install -y ffmpeg")
return path
def media_duration(path: Path) -> float:
result = subprocess.check_output(
[
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=nw=1:nk=1",
str(path),
],
text=True,
).strip()
return float(result)
def audio_files(audio_dir: Path) -> list[Path]:
files = [
path
for path in sorted(audio_dir.iterdir())
if path.is_file() and path.suffix.lower() in AUDIO_EXTS
]
if not files:
raise FileNotFoundError(f"No audio files found in {audio_dir}")
return files
def concat_audio_dir(audio_dir: Path, work_dir: Path, silence: float) -> Path:
work_dir.mkdir(parents=True, exist_ok=True)
normalized: list[Path] = []
silence_path = work_dir / "silence.wav"
run(
[
"ffmpeg",
"-hide_banner",
"-loglevel",
"error",
"-y",
"-f",
"lavfi",
"-t",
f"{silence:.3f}",
"-i",
"anullsrc=channel_layout=stereo:sample_rate=48000",
"-c:a",
"pcm_s16le",
str(silence_path),
]
)
for index, src in enumerate(audio_files(audio_dir), start=1):
dst = work_dir / f"audio_{index:02d}.wav"
run(
[
"ffmpeg",
"-hide_banner",
"-loglevel",
"error",
"-y",
"-i",
str(src),
"-vn",
"-ar",
"48000",
"-ac",
"2",
"-c:a",
"pcm_s16le",
str(dst),
]
)
normalized.append(dst)
concat_items: list[Path] = []
for index, item in enumerate(normalized):
concat_items.append(item)
if index != len(normalized) - 1 and silence > 0:
concat_items.append(silence_path)
list_path = work_dir / "audio_concat.txt"
with list_path.open("w", encoding="utf-8") as handle:
for item in concat_items:
escaped = item.resolve().as_posix().replace("'", "'\\''")
handle.write(f"file '{escaped}'\n")
out_audio = work_dir / "combined_voice.wav"
run(
[
"ffmpeg",
"-hide_banner",
"-loglevel",
"error",
"-y",
"-f",
"concat",
"-safe",
"0",
"-i",
str(list_path),
"-c:a",
"pcm_s16le",
str(out_audio),
]
)
return out_audio
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Combine one video with voice-over audio.")
parser.add_argument("--video", type=Path, required=True, help="Source video path.")
parser.add_argument("--audio", type=Path, default=None, help="Single voice-over audio file.")
parser.add_argument("--audio-dir", type=Path, default=None, help="Directory of ordered audio files.")
parser.add_argument("--output", type=Path, default=Path("05_outputs/final_voiceover.mp4"))
parser.add_argument("--work-dir", type=Path, default=Path("04_intermediate/ubuntu_voiceover"))
parser.add_argument("--silence", type=float, default=0.35, help="Gap seconds between audio files.")
parser.add_argument("--width", type=int, default=1920)
parser.add_argument("--height", type=int, default=1080)
parser.add_argument("--fps", type=int, default=30)
parser.add_argument("--crf", type=int, default=20)
parser.add_argument("--preset", default="medium")
parser.add_argument("--video-speed", type=float, default=None, help="Override automatic speed.")
return parser.parse_args()
def main() -> int:
args = parse_args()
require_tool("ffmpeg")
require_tool("ffprobe")
if not args.video.exists():
raise FileNotFoundError(args.video)
if bool(args.audio) == bool(args.audio_dir):
raise SystemExit("Use exactly one of --audio or --audio-dir.")
args.work_dir.mkdir(parents=True, exist_ok=True)
args.output.parent.mkdir(parents=True, exist_ok=True)
audio_path = args.audio if args.audio else concat_audio_dir(args.audio_dir, args.work_dir, args.silence)
if not audio_path or not audio_path.exists():
raise FileNotFoundError(audio_path)
video_duration = media_duration(args.video)
audio_duration = media_duration(audio_path)
if video_duration <= 0 or audio_duration <= 0:
raise RuntimeError("Invalid media duration.")
speed = args.video_speed if args.video_speed else video_duration / audio_duration
if speed <= 0:
raise ValueError("--video-speed must be greater than 0.")
print(f"video_duration={video_duration:.3f}s")
print(f"audio_duration={audio_duration:.3f}s")
print(f"video_speed={speed:.6f}x")
vf = (
f"[0:v]setpts=PTS/{speed:.8f},fps={args.fps},"
f"scale={args.width}:{args.height}:force_original_aspect_ratio=decrease,"
f"pad={args.width}:{args.height}:(ow-iw)/2:(oh-ih)/2:black,"
"setsar=1,format=yuv420p[v];"
"[1:a]aresample=48000,apad[a]"
)
run(
[
"ffmpeg",
"-hide_banner",
"-y",
"-i",
str(args.video),
"-i",
str(audio_path),
"-filter_complex",
vf,
"-map",
"[v]",
"-map",
"[a]",
"-t",
f"{audio_duration:.3f}",
"-c:v",
"libx264",
"-preset",
args.preset,
"-crf",
str(args.crf),
"-c:a",
"aac",
"-b:a",
"192k",
"-ar",
"48000",
"-ac",
"2",
"-movflags",
"+faststart",
str(args.output),
]
)
final_duration = media_duration(args.output)
print(f"output={args.output}")
print(f"final_duration={final_duration:.3f}s")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,21 @@
#!/usr/bin/env bash
set -euo pipefail
TARGET="${1:-02_audio}"
if ! command -v ffprobe >/dev/null 2>&1; then
echo "ffprobe is required. Install it with: sudo apt install -y ffmpeg" >&2
exit 1
fi
if [[ ! -e "$TARGET" ]]; then
echo "Path not found: $TARGET" >&2
exit 1
fi
find "$TARGET" -type f \( -iname '*.mp3' -o -iname '*.wav' -o -iname '*.m4a' \) -print0 |
sort -z |
while IFS= read -r -d '' file; do
duration="$(ffprobe -v error -show_entries format=duration -of default=nw=1:nk=1 "$file")"
printf '%8.3fs %s\n' "$duration" "$file"
done

View File

@@ -0,0 +1 @@
websocket-client>=1.8.0

View File

@@ -0,0 +1,5 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
python3 "$SCRIPT_DIR/xfyun_tts_ubuntu.py" --mode super "$@"

View File

@@ -0,0 +1,5 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
python3 "$SCRIPT_DIR/xfyun_tts_ubuntu.py" --mode normal "$@"

View File

@@ -0,0 +1,356 @@
#!/usr/bin/env python3
"""Generate XFYUN TTS voice files on Ubuntu.
This script supports both the normal XFYUN online TTS endpoint and the
super-realistic TTS endpoint used by the PowerShell workflow.
"""
from __future__ import annotations
import argparse
import base64
import email.utils
import hashlib
import hmac
import json
import os
import re
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from urllib.parse import quote, urlparse
NORMAL_TTS_URL = "wss://tts-api.xfyun.cn/v2/tts"
SUPER_TTS_URL = "wss://cbm01.cn-huabei-1.xf-yun.com/v1/private/mcd9m97e6"
@dataclass(frozen=True)
class ScriptSegment:
index: str
title: str
text: str
def safe_filename(value: str) -> str:
cleaned = re.sub(r'[\\/:*?"<>|]', "", value).strip()
return cleaned or "segment"
def find_default_script(cwd: Path) -> Path:
candidates = sorted(cwd.glob("*.md"))
preferred = [
path
for path in candidates
if path.name.startswith("配音稿") or path.name.lower().startswith("voice")
]
fallback = [
path
for path in candidates
if not path.name.startswith("配音生成工作流") and path.name.lower() != "readme.md"
]
selected = (preferred or fallback or candidates)
if not selected:
raise FileNotFoundError("Cannot find script Markdown file. Use --script to specify one.")
return selected[0]
def load_segments(script_path: Path) -> list[ScriptSegment]:
content = script_path.read_text(encoding="utf-8-sig")
pattern = re.compile(
r"(?ms)^##\s+([1-9])\.\s+(.+?)\r?\n(.*?)(?=^##\s+[1-9]\.\s+|\Z)"
)
matches = pattern.findall(content)
if not matches:
raise ValueError("Cannot find sections like '## 1. title' in script Markdown.")
segments: list[ScriptSegment] = []
metadata = re.compile(r"^(说明|时长|备注|镜头|画面|音色|语速|输出|提示)[:]")
for index, title, body in matches:
lines = []
for raw_line in body.splitlines():
line = raw_line.strip()
if not line or line.startswith("#") or metadata.match(line):
continue
lines.append(line)
text = "\n".join(lines).replace("\t", " ").strip()
if not text:
raise ValueError(f"Section {index} has no readable text.")
segments.append(ScriptSegment(index=index, title=title.strip(), text=text))
return segments
def build_auth_url(request_url: str, api_key: str, api_secret: str) -> str:
uri = urlparse(request_url)
host_name = uri.hostname or ""
path = uri.path or "/"
date = email.utils.formatdate(usegmt=True)
signature_origin = f"host: {host_name}\ndate: {date}\nGET {path} HTTP/1.1"
digest = hmac.new(
api_secret.encode("utf-8"),
signature_origin.encode("utf-8"),
hashlib.sha256,
).digest()
signature = base64.b64encode(digest).decode("ascii")
authorization_origin = (
f'api_key="{api_key}", algorithm="hmac-sha256", '
f'headers="host date request-line", signature="{signature}"'
)
authorization = base64.b64encode(authorization_origin.encode("utf-8")).decode("ascii")
return (
f"{request_url}?authorization={quote(authorization)}"
f"&date={quote(date)}&host={quote(host_name)}"
)
def require_websocket():
try:
import websocket # type: ignore
except ImportError as exc:
raise SystemExit(
"Missing dependency: websocket-client. Install it with:\n"
" python3 -m pip install -r Tools_scripts_XunFei-Ubuntu/requirements-ubuntu.txt"
) from exc
return websocket
def recv_json(socket: Any) -> dict[str, Any]:
message = socket.recv()
if isinstance(message, bytes):
message = message.decode("utf-8")
return json.loads(message)
def synthesize_normal(
*,
text: str,
out_file: Path,
app_id: str,
api_key: str,
api_secret: str,
voice: str,
speed: int,
volume: int,
pitch: int,
) -> None:
websocket = require_websocket()
url = build_auth_url(NORMAL_TTS_URL, api_key, api_secret)
socket = websocket.create_connection(url, timeout=30)
audio = bytearray()
try:
payload = {
"common": {"app_id": app_id},
"business": {
"aue": "lame",
"sfl": 1,
"auf": "audio/L16;rate=16000",
"vcn": voice,
"speed": speed,
"volume": volume,
"pitch": pitch,
"bgs": 0,
"tte": "UTF8",
"reg": "2",
"rdn": "0",
},
"data": {
"status": 2,
"text": base64.b64encode(text.encode("utf-8")).decode("ascii"),
},
}
socket.send(json.dumps(payload, ensure_ascii=False, separators=(",", ":")))
while True:
response = recv_json(socket)
if response.get("code", 0) != 0:
raise RuntimeError(
f"XFYUN normal TTS failed: code={response.get('code')}, "
f"message={response.get('message')}"
)
data = response.get("data") or {}
if data.get("audio"):
audio.extend(base64.b64decode(data["audio"]))
if data.get("status") == 2:
break
finally:
socket.close()
if not audio:
raise RuntimeError("No audio data returned by XFYUN normal TTS.")
out_file.write_bytes(audio)
def synthesize_super(
*,
text: str,
out_file: Path,
app_id: str,
api_key: str,
api_secret: str,
voice: str,
speed: int,
volume: int,
pitch: int,
raw_text: bool,
) -> None:
websocket = require_websocket()
url = build_auth_url(SUPER_TTS_URL, api_key, api_secret)
socket = websocket.create_connection(url, timeout=30)
audio = bytearray()
request_text = text if raw_text else base64.b64encode(text.encode("utf-8")).decode("ascii")
try:
payload = {
"header": {"app_id": app_id, "status": 2},
"parameter": {
"oral": {
"oral_level": "mid",
"spark_assist": 1,
"remain": 1,
},
"tts": {
"vcn": voice,
"speed": speed,
"volume": volume,
"pitch": pitch,
"bgs": 0,
"reg": 0,
"rdn": 0,
"rhy": 0,
"watermask": 0,
"implicit_watermark": False,
"audio": {
"encoding": "lame",
"sample_rate": 24000,
"channels": 1,
"bit_depth": 16,
"frame_size": 0,
},
},
},
"payload": {
"text": {
"encoding": "utf8",
"compress": "raw",
"format": "plain",
"status": 2,
"seq": 0,
"text": request_text,
}
},
}
socket.send(json.dumps(payload, ensure_ascii=False, separators=(",", ":")))
while True:
response = recv_json(socket)
header = response.get("header") or {}
if header and header.get("code", 0) != 0:
raise RuntimeError(
f"XFYUN super TTS failed: code={header.get('code')}, "
f"message={header.get('message')}, sid={header.get('sid')}"
)
if response.get("code", 0) != 0:
raise RuntimeError(
f"XFYUN super TTS failed: code={response.get('code')}, "
f"message={response.get('message')}"
)
payload_audio = ((response.get("payload") or {}).get("audio") or {})
if payload_audio.get("audio"):
audio.extend(base64.b64decode(payload_audio["audio"]))
if header.get("status") == 2 or payload_audio.get("status") == 2:
break
finally:
socket.close()
if not audio:
raise RuntimeError("No audio data returned by XFYUN super TTS.")
out_file.write_bytes(audio)
def validate_range(name: str, value: int) -> None:
if value < 0 or value > 100:
raise ValueError(f"{name} must be between 0 and 100.")
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Generate XFYUN TTS audio on Ubuntu.")
parser.add_argument("--script", type=Path, default=None, help="Markdown script path.")
parser.add_argument("--output-dir", type=Path, default=Path("02_audio/xfyun_tts"))
parser.add_argument("--mode", choices=["normal", "super"], default="super")
parser.add_argument("--voice", default=None, help="XFYUN vcn voice name.")
parser.add_argument("--speed", type=int, default=50)
parser.add_argument("--volume", type=int, default=70)
parser.add_argument("--pitch", type=int, default=50)
parser.add_argument("--raw-text", action="store_true", help="Use raw text for super TTS.")
parser.add_argument("--overwrite", action="store_true", help="Overwrite existing mp3 files.")
parser.add_argument("--dry-run", action="store_true", help="Only parse script and print plan.")
return parser.parse_args()
def main() -> int:
args = parse_args()
validate_range("speed", args.speed)
validate_range("volume", args.volume)
validate_range("pitch", args.pitch)
script_path = args.script or find_default_script(Path.cwd())
segments = load_segments(script_path)
voice = args.voice or ("xiaoyan" if args.mode == "normal" else "x5_lingfeiyi_flow")
print(f"script={script_path}")
print(f"mode={args.mode}")
print(f"voice={voice}")
print(f"segments={len(segments)}")
for segment in segments:
print(f" {segment.index}. {segment.title} ({len(segment.text)} chars)")
if args.dry_run:
return 0
app_id = os.environ.get("XF_APPID")
api_key = os.environ.get("XF_APIKEY")
api_secret = os.environ.get("XF_APISECRET")
if not app_id or not api_key or not api_secret:
raise SystemExit("Please set XF_APPID, XF_APIKEY and XF_APISECRET first.")
args.output_dir.mkdir(parents=True, exist_ok=True)
for segment in segments:
out_file = args.output_dir / f"{segment.index}-{safe_filename(segment.title)}.mp3"
if out_file.exists() and not args.overwrite:
print(f"skip existing: {out_file}")
continue
print(f"synthesizing {segment.index}: {segment.title}")
if args.mode == "normal":
synthesize_normal(
text=segment.text,
out_file=out_file,
app_id=app_id,
api_key=api_key,
api_secret=api_secret,
voice=voice,
speed=args.speed,
volume=args.volume,
pitch=args.pitch,
)
else:
synthesize_super(
text=segment.text,
out_file=out_file,
app_id=app_id,
api_key=api_key,
api_secret=api_secret,
voice=voice,
speed=args.speed,
volume=args.volume,
pitch=args.pitch,
raw_text=args.raw_text,
)
print(f"generated: {out_file}")
print("all voice files generated")
return 0
if __name__ == "__main__":
try:
raise SystemExit(main())
except KeyboardInterrupt:
raise SystemExit(130)