Files
Pre_Seg_Server/backend/routers/admin.py
admin 620e95ff91 修复演示恢复项目帧数据
- 恢复演示出厂设置后直接解析演视LC视频序列并生成可打开帧序列

- 保持演视DICOM序列按文件名自然顺序恢复并生成帧

- 增加 MinIO 浏览器访问端点配置,修复 Docker 部署中封面和帧图预签名地址使用容器内主机名的问题

- 更新管理员恢复测试覆盖视频和 DICOM 帧数量

- 更新 README 和前后端契约/设计/测试文档中的演示恢复说明
2026-05-07 16:17:07 +08:00

300 lines
11 KiB
Python

"""Administrator-only user and audit management endpoints."""
import os
from typing import List
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session
from config import settings
from database import get_db
from models import Annotation, AuditLog, Frame, Mask, ProcessingTask, Project, Template, User
from routers.auth import SUPPORTED_ROLES, ensure_default_admin, hash_password, normalize_user_role, require_admin, write_audit_log
from schemas import (
AdminUserCreate,
AdminUserUpdate,
AuditLogOut,
DemoFactoryResetOut,
DemoFactoryResetRequest,
UserOut,
)
from services.demo_media import (
DEMO_DICOM_PROJECT_NAME,
DEMO_VIDEO_PROJECT_NAME,
create_parsed_dicom_demo_project,
create_parsed_video_demo_project,
demo_dicom_files,
)
from services.default_templates import restore_default_templates
router = APIRouter(prefix="/api/admin", tags=["Admin"])
DEMO_RESET_CONFIRMATION = "RESET_DEMO_FACTORY"
DEMO_PROJECT_NAME = DEMO_DICOM_PROJECT_NAME
def _normalize_role(role: str | None) -> str:
normalized = (role or "annotator").strip().lower()
if normalized not in SUPPORTED_ROLES:
raise HTTPException(status_code=400, detail=f"Unsupported role: {role}")
return normalized
def _assert_non_admin_role(role: str) -> None:
if role == "admin":
raise HTTPException(status_code=400, detail="Only the default admin account can have admin role")
@router.get("/users", response_model=List[UserOut], summary="List users")
def list_users(
db: Session = Depends(get_db),
admin_user: User = Depends(require_admin),
) -> List[User]:
"""Return all users for the administrator console."""
_ = admin_user
users = db.query(User).order_by(User.id).all()
return [normalize_user_role(db, user) for user in users]
@router.post(
"/users",
response_model=UserOut,
status_code=status.HTTP_201_CREATED,
summary="Create user",
)
def create_user(
payload: AdminUserCreate,
db: Session = Depends(get_db),
admin_user: User = Depends(require_admin),
) -> User:
"""Create a user with an initial password and role."""
username = payload.username.strip()
if not username:
raise HTTPException(status_code=400, detail="Username is required")
if len(payload.password) < 6:
raise HTTPException(status_code=400, detail="Password must be at least 6 characters")
role = _normalize_role(payload.role)
_assert_non_admin_role(role)
user = User(
username=username,
password_hash=hash_password(payload.password),
role=role,
is_active=1 if payload.is_active else 0,
)
db.add(user)
try:
db.commit()
except IntegrityError as exc:
db.rollback()
raise HTTPException(status_code=409, detail="Username already exists") from exc
db.refresh(user)
write_audit_log(
db,
actor=admin_user,
action="admin.user_created",
target_type="user",
target_id=user.id,
detail={"username": user.username, "role": user.role, "is_active": bool(user.is_active)},
)
return user
@router.patch("/users/{user_id}", response_model=UserOut, summary="Update user")
def update_user(
user_id: int,
payload: AdminUserUpdate,
db: Session = Depends(get_db),
admin_user: User = Depends(require_admin),
) -> User:
"""Update username, password, role or active state."""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
user = normalize_user_role(db, user)
updates = payload.model_dump(exclude_unset=True)
audit_detail: dict = {"before": {"username": user.username, "role": user.role, "is_active": bool(user.is_active)}}
if "username" in updates:
username = (updates["username"] or "").strip()
if not username:
raise HTTPException(status_code=400, detail="Username is required")
if user.role == "admin" and username != settings.default_admin_username:
raise HTTPException(status_code=400, detail="Default admin username cannot be changed")
user.username = username
if "password" in updates:
password = updates["password"] or ""
if len(password) < 6:
raise HTTPException(status_code=400, detail="Password must be at least 6 characters")
user.password_hash = hash_password(password)
if "role" in updates:
next_role = _normalize_role(updates["role"])
if user.username == settings.default_admin_username:
if next_role != "admin":
raise HTTPException(status_code=400, detail="Cannot remove the default admin role")
else:
_assert_non_admin_role(next_role)
user.role = next_role
if "is_active" in updates:
if user.id == admin_user.id and not updates["is_active"]:
raise HTTPException(status_code=400, detail="Cannot deactivate yourself")
user.is_active = 1 if updates["is_active"] else 0
try:
db.commit()
except IntegrityError as exc:
db.rollback()
raise HTTPException(status_code=409, detail="Username already exists") from exc
db.refresh(user)
audit_detail["after"] = {"username": user.username, "role": user.role, "is_active": bool(user.is_active)}
audit_detail["password_changed"] = "password" in updates
write_audit_log(
db,
actor=admin_user,
action="admin.user_updated",
target_type="user",
target_id=user.id,
detail=audit_detail,
)
return user
@router.delete("/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT, summary="Delete user")
def delete_user(
user_id: int,
db: Session = Depends(get_db),
admin_user: User = Depends(require_admin),
) -> None:
"""Delete a user when it is safe to remove the account."""
if user_id == admin_user.id:
raise HTTPException(status_code=400, detail="Cannot delete yourself")
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
user = normalize_user_role(db, user)
if user.role == "admin":
raise HTTPException(status_code=400, detail="Cannot delete the default admin account")
username = user.username
db.delete(user)
db.commit()
write_audit_log(
db,
actor=admin_user,
action="admin.user_deleted",
target_type="user",
target_id=user_id,
detail={"username": username},
)
return None
@router.get("/audit-logs", response_model=List[AuditLogOut], summary="List audit logs")
def list_audit_logs(
limit: int = 100,
db: Session = Depends(get_db),
admin_user: User = Depends(require_admin),
) -> List[AuditLog]:
"""Return recent audit events for administrators."""
_ = admin_user
safe_limit = min(max(int(limit or 100), 1), 500)
return db.query(AuditLog).order_by(AuditLog.created_at.desc(), AuditLog.id.desc()).limit(safe_limit).all()
@router.post(
"/demo-factory-reset",
response_model=DemoFactoryResetOut,
summary="Reset demo data to factory defaults",
)
def reset_demo_factory(
payload: DemoFactoryResetRequest,
db: Session = Depends(get_db),
admin_user: User = Depends(require_admin),
) -> dict:
"""Reset a demo deployment to one admin account, the demo video, and the demo DICOM project."""
if payload.confirmation != DEMO_RESET_CONFIRMATION:
raise HTTPException(status_code=400, detail="Invalid reset confirmation")
if not os.path.exists(settings.demo_video_path):
raise HTTPException(
status_code=409,
detail=f"Demo video not found: {settings.demo_video_path}",
)
if not demo_dicom_files(settings.demo_dicom_dir):
raise HTTPException(
status_code=409,
detail=f"Demo DICOM series not found: {settings.demo_dicom_dir}",
)
requested_by = admin_user.username
preserved_admin = ensure_default_admin(db)
preserved_admin.username = settings.default_admin_username
preserved_admin.password_hash = hash_password(settings.default_admin_password)
preserved_admin.role = "admin"
preserved_admin.is_active = 1
db.flush()
deleted_counts = {
"masks": db.query(Mask).delete(synchronize_session=False),
"annotations": db.query(Annotation).delete(synchronize_session=False),
"frames": db.query(Frame).delete(synchronize_session=False),
"tasks": db.query(ProcessingTask).delete(synchronize_session=False),
"projects": db.query(Project).delete(synchronize_session=False),
"user_templates": db.query(Template).filter(Template.owner_user_id.is_not(None)).delete(synchronize_session=False),
"audit_logs": db.query(AuditLog).delete(synchronize_session=False),
"users": db.query(User).filter(User.id != preserved_admin.id).delete(synchronize_session=False),
}
db.flush()
db.expunge_all()
preserved_admin = db.query(User).filter(User.username == settings.default_admin_username).first()
if not preserved_admin:
raise HTTPException(status_code=500, detail="Default admin was not preserved")
restored_templates = restore_default_templates(db)
video_project = create_parsed_video_demo_project(
db,
owner=preserved_admin,
video_path=settings.demo_video_path,
project_name=DEMO_VIDEO_PROJECT_NAME,
)
dicom_project = create_parsed_dicom_demo_project(
db,
owner=preserved_admin,
dicom_dir=settings.demo_dicom_dir,
project_name=DEMO_PROJECT_NAME,
)
db.refresh(preserved_admin)
db.refresh(video_project)
db.refresh(dicom_project)
video_project.frame_count = len(video_project.frames)
dicom_project.frame_count = len(dicom_project.frames)
projects = [video_project, dicom_project]
write_audit_log(
db,
actor=preserved_admin,
action="admin.demo_factory_reset",
target_type="project",
target_id=dicom_project.id,
detail={
"project_names": [project.name for project in projects],
"video_path": video_project.video_path,
"dicom_path": dicom_project.video_path,
"source_types": [project.source_type for project in projects],
"frame_counts": {project.name: len(project.frames) for project in projects},
"deleted_counts": deleted_counts,
"restored_templates": [template.name for template in restored_templates],
"requested_by": requested_by,
},
)
return {
"admin_user": preserved_admin,
"project": dicom_project,
"projects": projects,
"deleted_counts": deleted_counts,
"message": "演示环境已恢复出厂设置",
}