Files
Pre_Seg_Server/backend/services/demo_media.py
admin b1131c9126 切换演示数据到 demo 目录
- 默认演示视频和 DICOM 路径改为 demo/演视LC视频序列.mp4 与 demo/演视DICOM序列/。

- 演示 DICOM 项目名统一为“演视DICOM序列”,并兼容迁移旧“演示DICOM序列”名称。

- 恢复演示出厂设置测试改为验证新文件名上传路径和新项目名。

- 同步更新用户管理提示、API 契约、安装文档、实现地图和项目指南。

- 本地与 ../Seg_Server_Docker 已实际放入 demo 演示视频和 DICOM 测试影像;数据文件受 .gitignore 保护不进入提交。
2026-05-07 15:58:29 +08:00

131 lines
4.1 KiB
Python

"""Helpers for seeding the bundled demo media project."""
from __future__ import annotations
import os
import shutil
import tempfile
from pathlib import Path
import cv2
from sqlalchemy.orm import Session
from minio_client import upload_file
from models import Frame, Project, User
from services.frame_parser import natural_filename_key, parse_dicom, upload_frames_to_minio
from statuses import PROJECT_STATUS_PENDING, PROJECT_STATUS_READY
DEMO_DICOM_PROJECT_NAME = "演视DICOM序列"
DEMO_DICOM_PARSE_FPS = 30.0
DEMO_VIDEO_PROJECT_NAME = "演视LC视频序列"
LEGACY_DEMO_VIDEO_PROJECT_NAMES = {"Data_MyVideo_1"}
LEGACY_DEMO_DICOM_PROJECT_NAMES = {"演示DICOM序列"}
def demo_dicom_files(dicom_dir: str) -> list[Path]:
"""Return .dcm files in natural file-name order."""
root = Path(dicom_dir)
if not root.exists() or not root.is_dir():
return []
return sorted(
[path for path in root.iterdir() if path.is_file() and path.name.lower().endswith(".dcm")],
key=lambda path: natural_filename_key(path.name),
)
def create_unparsed_video_demo_project(
db: Session,
*,
owner: User,
video_path: str,
project_name: str = DEMO_VIDEO_PROJECT_NAME,
) -> Project:
"""Create the bundled demo video project without extracting frames."""
source = Path(video_path)
if not source.exists() or not source.is_file():
raise FileNotFoundError(f"Demo video not found: {video_path}")
project = Project(
name=project_name,
description="默认演示视频,尚未生成帧",
status=PROJECT_STATUS_PENDING,
source_type="video",
parse_fps=30.0,
original_fps=None,
owner_user_id=owner.id,
)
db.add(project)
db.flush()
data = source.read_bytes()
object_name = f"uploads/{project.id}/{source.name}"
upload_file(object_name, data, content_type="video/mp4", length=len(data))
project.video_path = object_name
project.thumbnail_url = None
db.commit()
db.refresh(project)
return project
def create_parsed_dicom_demo_project(
db: Session,
*,
owner: User,
dicom_dir: str,
project_name: str = DEMO_DICOM_PROJECT_NAME,
) -> Project:
"""Create the demo DICOM project, upload the series, and register parsed frames."""
dcm_files = demo_dicom_files(dicom_dir)
if not dcm_files:
raise FileNotFoundError(f"Demo DICOM series not found: {dicom_dir}")
project = Project(
name=project_name,
description=f"默认演示 DICOM 序列,已按文件名自然顺序生成 {len(dcm_files)}",
status=PROJECT_STATUS_PENDING,
source_type="dicom",
parse_fps=DEMO_DICOM_PARSE_FPS,
original_fps=None,
owner_user_id=owner.id,
)
db.add(project)
db.flush()
dicom_prefix = f"uploads/{project.id}/dicom"
for dcm_file in dcm_files:
data = dcm_file.read_bytes()
upload_file(
f"{dicom_prefix}/{dcm_file.name}",
data,
content_type="application/dicom",
length=len(data),
)
project.video_path = dicom_prefix
tmp_dir = tempfile.mkdtemp(prefix=f"seg_demo_dicom_{project.id}_")
try:
output_dir = os.path.join(tmp_dir, "frames")
frame_files = parse_dicom(dicom_dir, output_dir)
object_names = upload_frames_to_minio(frame_files, project.id)
for idx, obj_name in enumerate(object_names):
image = cv2.imread(frame_files[idx])
height, width = image.shape[:2] if image is not None else (None, None)
db.add(Frame(
project_id=project.id,
frame_index=idx,
image_url=obj_name,
width=width,
height=height,
timestamp_ms=idx * 1000.0 / DEMO_DICOM_PARSE_FPS,
source_frame_number=idx,
))
if object_names:
project.thumbnail_url = object_names[0]
project.status = PROJECT_STATUS_READY
db.commit()
db.refresh(project)
return project
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)