Files
ISISeg/Tools_scripts_XunFei-Ubuntu/xfyun_tts_ubuntu.py

357 lines
12 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Generate XFYUN TTS voice files on Ubuntu.
This script supports both the normal XFYUN online TTS endpoint and the
super-realistic TTS endpoint used by the PowerShell workflow.
"""
from __future__ import annotations
import argparse
import base64
import email.utils
import hashlib
import hmac
import json
import os
import re
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from urllib.parse import quote, urlparse
NORMAL_TTS_URL = "wss://tts-api.xfyun.cn/v2/tts"
SUPER_TTS_URL = "wss://cbm01.cn-huabei-1.xf-yun.com/v1/private/mcd9m97e6"
@dataclass(frozen=True)
class ScriptSegment:
index: str
title: str
text: str
def safe_filename(value: str) -> str:
cleaned = re.sub(r'[\\/:*?"<>|]', "", value).strip()
return cleaned or "segment"
def find_default_script(cwd: Path) -> Path:
candidates = sorted(cwd.glob("*.md"))
preferred = [
path
for path in candidates
if path.name.startswith("配音稿") or path.name.lower().startswith("voice")
]
fallback = [
path
for path in candidates
if not path.name.startswith("配音生成工作流") and path.name.lower() != "readme.md"
]
selected = (preferred or fallback or candidates)
if not selected:
raise FileNotFoundError("Cannot find script Markdown file. Use --script to specify one.")
return selected[0]
def load_segments(script_path: Path) -> list[ScriptSegment]:
content = script_path.read_text(encoding="utf-8-sig")
pattern = re.compile(
r"(?ms)^##\s+([1-9])\.\s+(.+?)\r?\n(.*?)(?=^##\s+[1-9]\.\s+|\Z)"
)
matches = pattern.findall(content)
if not matches:
raise ValueError("Cannot find sections like '## 1. title' in script Markdown.")
segments: list[ScriptSegment] = []
metadata = re.compile(r"^(说明|时长|备注|镜头|画面|音色|语速|输出|提示)[:]")
for index, title, body in matches:
lines = []
for raw_line in body.splitlines():
line = raw_line.strip()
if not line or line.startswith("#") or metadata.match(line):
continue
lines.append(line)
text = "\n".join(lines).replace("\t", " ").strip()
if not text:
raise ValueError(f"Section {index} has no readable text.")
segments.append(ScriptSegment(index=index, title=title.strip(), text=text))
return segments
def build_auth_url(request_url: str, api_key: str, api_secret: str) -> str:
uri = urlparse(request_url)
host_name = uri.hostname or ""
path = uri.path or "/"
date = email.utils.formatdate(usegmt=True)
signature_origin = f"host: {host_name}\ndate: {date}\nGET {path} HTTP/1.1"
digest = hmac.new(
api_secret.encode("utf-8"),
signature_origin.encode("utf-8"),
hashlib.sha256,
).digest()
signature = base64.b64encode(digest).decode("ascii")
authorization_origin = (
f'api_key="{api_key}", algorithm="hmac-sha256", '
f'headers="host date request-line", signature="{signature}"'
)
authorization = base64.b64encode(authorization_origin.encode("utf-8")).decode("ascii")
return (
f"{request_url}?authorization={quote(authorization)}"
f"&date={quote(date)}&host={quote(host_name)}"
)
def require_websocket():
try:
import websocket # type: ignore
except ImportError as exc:
raise SystemExit(
"Missing dependency: websocket-client. Install it with:\n"
" python3 -m pip install -r Tools_scripts_XunFei-Ubuntu/requirements-ubuntu.txt"
) from exc
return websocket
def recv_json(socket: Any) -> dict[str, Any]:
message = socket.recv()
if isinstance(message, bytes):
message = message.decode("utf-8")
return json.loads(message)
def synthesize_normal(
*,
text: str,
out_file: Path,
app_id: str,
api_key: str,
api_secret: str,
voice: str,
speed: int,
volume: int,
pitch: int,
) -> None:
websocket = require_websocket()
url = build_auth_url(NORMAL_TTS_URL, api_key, api_secret)
socket = websocket.create_connection(url, timeout=30)
audio = bytearray()
try:
payload = {
"common": {"app_id": app_id},
"business": {
"aue": "lame",
"sfl": 1,
"auf": "audio/L16;rate=16000",
"vcn": voice,
"speed": speed,
"volume": volume,
"pitch": pitch,
"bgs": 0,
"tte": "UTF8",
"reg": "2",
"rdn": "0",
},
"data": {
"status": 2,
"text": base64.b64encode(text.encode("utf-8")).decode("ascii"),
},
}
socket.send(json.dumps(payload, ensure_ascii=False, separators=(",", ":")))
while True:
response = recv_json(socket)
if response.get("code", 0) != 0:
raise RuntimeError(
f"XFYUN normal TTS failed: code={response.get('code')}, "
f"message={response.get('message')}"
)
data = response.get("data") or {}
if data.get("audio"):
audio.extend(base64.b64decode(data["audio"]))
if data.get("status") == 2:
break
finally:
socket.close()
if not audio:
raise RuntimeError("No audio data returned by XFYUN normal TTS.")
out_file.write_bytes(audio)
def synthesize_super(
*,
text: str,
out_file: Path,
app_id: str,
api_key: str,
api_secret: str,
voice: str,
speed: int,
volume: int,
pitch: int,
raw_text: bool,
) -> None:
websocket = require_websocket()
url = build_auth_url(SUPER_TTS_URL, api_key, api_secret)
socket = websocket.create_connection(url, timeout=30)
audio = bytearray()
request_text = text if raw_text else base64.b64encode(text.encode("utf-8")).decode("ascii")
try:
payload = {
"header": {"app_id": app_id, "status": 2},
"parameter": {
"oral": {
"oral_level": "mid",
"spark_assist": 1,
"remain": 1,
},
"tts": {
"vcn": voice,
"speed": speed,
"volume": volume,
"pitch": pitch,
"bgs": 0,
"reg": 0,
"rdn": 0,
"rhy": 0,
"watermask": 0,
"implicit_watermark": False,
"audio": {
"encoding": "lame",
"sample_rate": 24000,
"channels": 1,
"bit_depth": 16,
"frame_size": 0,
},
},
},
"payload": {
"text": {
"encoding": "utf8",
"compress": "raw",
"format": "plain",
"status": 2,
"seq": 0,
"text": request_text,
}
},
}
socket.send(json.dumps(payload, ensure_ascii=False, separators=(",", ":")))
while True:
response = recv_json(socket)
header = response.get("header") or {}
if header and header.get("code", 0) != 0:
raise RuntimeError(
f"XFYUN super TTS failed: code={header.get('code')}, "
f"message={header.get('message')}, sid={header.get('sid')}"
)
if response.get("code", 0) != 0:
raise RuntimeError(
f"XFYUN super TTS failed: code={response.get('code')}, "
f"message={response.get('message')}"
)
payload_audio = ((response.get("payload") or {}).get("audio") or {})
if payload_audio.get("audio"):
audio.extend(base64.b64decode(payload_audio["audio"]))
if header.get("status") == 2 or payload_audio.get("status") == 2:
break
finally:
socket.close()
if not audio:
raise RuntimeError("No audio data returned by XFYUN super TTS.")
out_file.write_bytes(audio)
def validate_range(name: str, value: int) -> None:
if value < 0 or value > 100:
raise ValueError(f"{name} must be between 0 and 100.")
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Generate XFYUN TTS audio on Ubuntu.")
parser.add_argument("--script", type=Path, default=None, help="Markdown script path.")
parser.add_argument("--output-dir", type=Path, default=Path("02_audio/xfyun_tts"))
parser.add_argument("--mode", choices=["normal", "super"], default="super")
parser.add_argument("--voice", default=None, help="XFYUN vcn voice name.")
parser.add_argument("--speed", type=int, default=50)
parser.add_argument("--volume", type=int, default=70)
parser.add_argument("--pitch", type=int, default=50)
parser.add_argument("--raw-text", action="store_true", help="Use raw text for super TTS.")
parser.add_argument("--overwrite", action="store_true", help="Overwrite existing mp3 files.")
parser.add_argument("--dry-run", action="store_true", help="Only parse script and print plan.")
return parser.parse_args()
def main() -> int:
args = parse_args()
validate_range("speed", args.speed)
validate_range("volume", args.volume)
validate_range("pitch", args.pitch)
script_path = args.script or find_default_script(Path.cwd())
segments = load_segments(script_path)
voice = args.voice or ("xiaoyan" if args.mode == "normal" else "x5_lingfeiyi_flow")
print(f"script={script_path}")
print(f"mode={args.mode}")
print(f"voice={voice}")
print(f"segments={len(segments)}")
for segment in segments:
print(f" {segment.index}. {segment.title} ({len(segment.text)} chars)")
if args.dry_run:
return 0
app_id = os.environ.get("XF_APPID")
api_key = os.environ.get("XF_APIKEY")
api_secret = os.environ.get("XF_APISECRET")
if not app_id or not api_key or not api_secret:
raise SystemExit("Please set XF_APPID, XF_APIKEY and XF_APISECRET first.")
args.output_dir.mkdir(parents=True, exist_ok=True)
for segment in segments:
out_file = args.output_dir / f"{segment.index}-{safe_filename(segment.title)}.mp3"
if out_file.exists() and not args.overwrite:
print(f"skip existing: {out_file}")
continue
print(f"synthesizing {segment.index}: {segment.title}")
if args.mode == "normal":
synthesize_normal(
text=segment.text,
out_file=out_file,
app_id=app_id,
api_key=api_key,
api_secret=api_secret,
voice=voice,
speed=args.speed,
volume=args.volume,
pitch=args.pitch,
)
else:
synthesize_super(
text=segment.text,
out_file=out_file,
app_id=app_id,
api_key=api_key,
api_secret=api_secret,
voice=voice,
speed=args.speed,
volume=args.volume,
pitch=args.pitch,
raw_text=args.raw_text,
)
print(f"generated: {out_file}")
print("all voice files generated")
return 0
if __name__ == "__main__":
try:
raise SystemExit(main())
except KeyboardInterrupt:
raise SystemExit(130)