Files
Pre_Seg_Server/backend/services/demo_media.py
admin 620e95ff91 修复演示恢复项目帧数据
- 恢复演示出厂设置后直接解析演视LC视频序列并生成可打开帧序列

- 保持演视DICOM序列按文件名自然顺序恢复并生成帧

- 增加 MinIO 浏览器访问端点配置,修复 Docker 部署中封面和帧图预签名地址使用容器内主机名的问题

- 更新管理员恢复测试覆盖视频和 DICOM 帧数量

- 更新 README 和前后端契约/设计/测试文档中的演示恢复说明
2026-05-07 16:17:07 +08:00

218 lines
6.9 KiB
Python

"""Helpers for seeding the bundled demo media project."""
from __future__ import annotations
import os
import shutil
import tempfile
from pathlib import Path
import cv2
from sqlalchemy.orm import Session
from minio_client import upload_file
from models import Frame, Project, User
from services.frame_parser import (
extract_thumbnail,
natural_filename_key,
parse_dicom,
parse_video,
upload_frames_to_minio,
)
from statuses import PROJECT_STATUS_PENDING, PROJECT_STATUS_READY
DEMO_DICOM_PROJECT_NAME = "演视DICOM序列"
DEMO_DICOM_PARSE_FPS = 30.0
DEMO_VIDEO_PROJECT_NAME = "演视LC视频序列"
DEMO_VIDEO_PARSE_FPS = 30.0
DEMO_VIDEO_TARGET_WIDTH = 640
LEGACY_DEMO_VIDEO_PROJECT_NAMES = {"Data_MyVideo_1"}
LEGACY_DEMO_DICOM_PROJECT_NAMES = {"演示DICOM序列"}
def demo_dicom_files(dicom_dir: str) -> list[Path]:
"""Return .dcm files in natural file-name order."""
root = Path(dicom_dir)
if not root.exists() or not root.is_dir():
return []
return sorted(
[path for path in root.iterdir() if path.is_file() and path.name.lower().endswith(".dcm")],
key=lambda path: natural_filename_key(path.name),
)
def create_unparsed_video_demo_project(
db: Session,
*,
owner: User,
video_path: str,
project_name: str = DEMO_VIDEO_PROJECT_NAME,
) -> Project:
"""Create the bundled demo video project without extracting frames."""
source = Path(video_path)
if not source.exists() or not source.is_file():
raise FileNotFoundError(f"Demo video not found: {video_path}")
project = Project(
name=project_name,
description="默认演示视频,尚未生成帧",
status=PROJECT_STATUS_PENDING,
source_type="video",
parse_fps=30.0,
original_fps=None,
owner_user_id=owner.id,
)
db.add(project)
db.flush()
data = source.read_bytes()
object_name = f"uploads/{project.id}/{source.name}"
upload_file(object_name, data, content_type="video/mp4", length=len(data))
project.video_path = object_name
project.thumbnail_url = None
db.commit()
db.refresh(project)
return project
def create_parsed_video_demo_project(
db: Session,
*,
owner: User,
video_path: str,
project_name: str = DEMO_VIDEO_PROJECT_NAME,
) -> Project:
"""Create the bundled demo video project and register its extracted frame sequence."""
source = Path(video_path)
if not source.exists() or not source.is_file():
raise FileNotFoundError(f"Demo video not found: {video_path}")
project = Project(
name=project_name,
description="默认演示视频,已生成帧",
status=PROJECT_STATUS_PENDING,
source_type="video",
parse_fps=DEMO_VIDEO_PARSE_FPS,
original_fps=None,
owner_user_id=owner.id,
)
db.add(project)
db.flush()
data = source.read_bytes()
object_name = f"uploads/{project.id}/{source.name}"
upload_file(object_name, data, content_type="video/mp4", length=len(data))
project.video_path = object_name
tmp_dir = tempfile.mkdtemp(prefix=f"seg_demo_video_{project.id}_")
try:
output_dir = os.path.join(tmp_dir, "frames")
frame_files, original_fps = parse_video(
str(source),
output_dir,
fps=int(DEMO_VIDEO_PARSE_FPS),
target_width=DEMO_VIDEO_TARGET_WIDTH,
)
project.original_fps = original_fps
object_names = upload_frames_to_minio(frame_files, project.id)
for idx, obj_name in enumerate(object_names):
image = cv2.imread(frame_files[idx])
height, width = image.shape[:2] if image is not None else (None, None)
db.add(Frame(
project_id=project.id,
frame_index=idx,
image_url=obj_name,
width=width,
height=height,
timestamp_ms=idx * 1000.0 / DEMO_VIDEO_PARSE_FPS,
source_frame_number=idx,
))
thumbnail_path = os.path.join(tmp_dir, "thumbnail.jpg")
try:
extract_thumbnail(str(source), thumbnail_path)
with open(thumbnail_path, "rb") as thumbnail_file:
thumbnail_data = thumbnail_file.read()
thumbnail_object = f"projects/{project.id}/thumbnail.jpg"
upload_file(
thumbnail_object,
thumbnail_data,
content_type="image/jpeg",
length=len(thumbnail_data),
)
project.thumbnail_url = thumbnail_object
except Exception: # noqa: BLE001
if object_names:
project.thumbnail_url = object_names[0]
project.status = PROJECT_STATUS_READY
db.commit()
db.refresh(project)
return project
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)
def create_parsed_dicom_demo_project(
db: Session,
*,
owner: User,
dicom_dir: str,
project_name: str = DEMO_DICOM_PROJECT_NAME,
) -> Project:
"""Create the demo DICOM project, upload the series, and register parsed frames."""
dcm_files = demo_dicom_files(dicom_dir)
if not dcm_files:
raise FileNotFoundError(f"Demo DICOM series not found: {dicom_dir}")
project = Project(
name=project_name,
description=f"默认演示 DICOM 序列,已按文件名自然顺序生成 {len(dcm_files)}",
status=PROJECT_STATUS_PENDING,
source_type="dicom",
parse_fps=DEMO_DICOM_PARSE_FPS,
original_fps=None,
owner_user_id=owner.id,
)
db.add(project)
db.flush()
dicom_prefix = f"uploads/{project.id}/dicom"
for dcm_file in dcm_files:
data = dcm_file.read_bytes()
upload_file(
f"{dicom_prefix}/{dcm_file.name}",
data,
content_type="application/dicom",
length=len(data),
)
project.video_path = dicom_prefix
tmp_dir = tempfile.mkdtemp(prefix=f"seg_demo_dicom_{project.id}_")
try:
output_dir = os.path.join(tmp_dir, "frames")
frame_files = parse_dicom(dicom_dir, output_dir)
object_names = upload_frames_to_minio(frame_files, project.id)
for idx, obj_name in enumerate(object_names):
image = cv2.imread(frame_files[idx])
height, width = image.shape[:2] if image is not None else (None, None)
db.add(Frame(
project_id=project.id,
frame_index=idx,
image_url=obj_name,
width=width,
height=height,
timestamp_ms=idx * 1000.0 / DEMO_DICOM_PARSE_FPS,
source_frame_number=idx,
))
if object_names:
project.thumbnail_url = object_names[0]
project.status = PROJECT_STATUS_READY
db.commit()
db.refresh(project)
return project
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)