- 登录页和侧栏统一使用根目录 logo_square.png,并更新登录系统名称与副标题。 - 更新 Dashboard、项目库和工作区时间轴文案,移除底层时序视频图层说明。 - 演示视频项目显示名改为“演视LC视频序列”,启动时兼容迁移旧 Data_MyVideo_1 名称,恢复出厂设置使用新名。 - 调整侧栏用户管理入口为用户图标,底部当前用户入口为退出图标,并让退出提示不接收鼠标事件。 - 补充前端组件测试、后端演示重置测试和文档说明。
130 lines
4.0 KiB
Python
130 lines
4.0 KiB
Python
"""Helpers for seeding the bundled demo media project."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import shutil
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
import cv2
|
|
from sqlalchemy.orm import Session
|
|
|
|
from minio_client import upload_file
|
|
from models import Frame, Project, User
|
|
from services.frame_parser import natural_filename_key, parse_dicom, upload_frames_to_minio
|
|
from statuses import PROJECT_STATUS_PENDING, PROJECT_STATUS_READY
|
|
|
|
DEMO_DICOM_PROJECT_NAME = "演示DICOM序列"
|
|
DEMO_DICOM_PARSE_FPS = 30.0
|
|
DEMO_VIDEO_PROJECT_NAME = "演视LC视频序列"
|
|
LEGACY_DEMO_VIDEO_PROJECT_NAMES = {"Data_MyVideo_1"}
|
|
|
|
|
|
def demo_dicom_files(dicom_dir: str) -> list[Path]:
|
|
"""Return .dcm files in natural file-name order."""
|
|
root = Path(dicom_dir)
|
|
if not root.exists() or not root.is_dir():
|
|
return []
|
|
return sorted(
|
|
[path for path in root.iterdir() if path.is_file() and path.name.lower().endswith(".dcm")],
|
|
key=lambda path: natural_filename_key(path.name),
|
|
)
|
|
|
|
|
|
def create_unparsed_video_demo_project(
|
|
db: Session,
|
|
*,
|
|
owner: User,
|
|
video_path: str,
|
|
project_name: str = DEMO_VIDEO_PROJECT_NAME,
|
|
) -> Project:
|
|
"""Create the bundled demo video project without extracting frames."""
|
|
source = Path(video_path)
|
|
if not source.exists() or not source.is_file():
|
|
raise FileNotFoundError(f"Demo video not found: {video_path}")
|
|
|
|
project = Project(
|
|
name=project_name,
|
|
description="默认演示视频,尚未生成帧",
|
|
status=PROJECT_STATUS_PENDING,
|
|
source_type="video",
|
|
parse_fps=30.0,
|
|
original_fps=None,
|
|
owner_user_id=owner.id,
|
|
)
|
|
db.add(project)
|
|
db.flush()
|
|
|
|
data = source.read_bytes()
|
|
object_name = f"uploads/{project.id}/{source.name}"
|
|
upload_file(object_name, data, content_type="video/mp4", length=len(data))
|
|
project.video_path = object_name
|
|
project.thumbnail_url = None
|
|
db.commit()
|
|
db.refresh(project)
|
|
return project
|
|
|
|
|
|
def create_parsed_dicom_demo_project(
|
|
db: Session,
|
|
*,
|
|
owner: User,
|
|
dicom_dir: str,
|
|
project_name: str = DEMO_DICOM_PROJECT_NAME,
|
|
) -> Project:
|
|
"""Create the demo DICOM project, upload the series, and register parsed frames."""
|
|
dcm_files = demo_dicom_files(dicom_dir)
|
|
if not dcm_files:
|
|
raise FileNotFoundError(f"Demo DICOM series not found: {dicom_dir}")
|
|
|
|
project = Project(
|
|
name=project_name,
|
|
description=f"默认演示 DICOM 序列,已按文件名自然顺序生成 {len(dcm_files)} 帧",
|
|
status=PROJECT_STATUS_PENDING,
|
|
source_type="dicom",
|
|
parse_fps=DEMO_DICOM_PARSE_FPS,
|
|
original_fps=None,
|
|
owner_user_id=owner.id,
|
|
)
|
|
db.add(project)
|
|
db.flush()
|
|
|
|
dicom_prefix = f"uploads/{project.id}/dicom"
|
|
for dcm_file in dcm_files:
|
|
data = dcm_file.read_bytes()
|
|
upload_file(
|
|
f"{dicom_prefix}/{dcm_file.name}",
|
|
data,
|
|
content_type="application/dicom",
|
|
length=len(data),
|
|
)
|
|
project.video_path = dicom_prefix
|
|
|
|
tmp_dir = tempfile.mkdtemp(prefix=f"seg_demo_dicom_{project.id}_")
|
|
try:
|
|
output_dir = os.path.join(tmp_dir, "frames")
|
|
frame_files = parse_dicom(dicom_dir, output_dir)
|
|
object_names = upload_frames_to_minio(frame_files, project.id)
|
|
|
|
for idx, obj_name in enumerate(object_names):
|
|
image = cv2.imread(frame_files[idx])
|
|
height, width = image.shape[:2] if image is not None else (None, None)
|
|
db.add(Frame(
|
|
project_id=project.id,
|
|
frame_index=idx,
|
|
image_url=obj_name,
|
|
width=width,
|
|
height=height,
|
|
timestamp_ms=idx * 1000.0 / DEMO_DICOM_PARSE_FPS,
|
|
source_frame_number=idx,
|
|
))
|
|
if object_names:
|
|
project.thumbnail_url = object_names[0]
|
|
project.status = PROJECT_STATUS_READY
|
|
db.commit()
|
|
db.refresh(project)
|
|
return project
|
|
finally:
|
|
shutil.rmtree(tmp_dir, ignore_errors=True)
|