Files
Pre_Seg_Server/backend/routers/admin.py
admin cadacef04d 修复演示恢复默认模板覆盖逻辑
- 新增后端默认模板服务,集中维护腹腔镜胆囊切除术和头颈部CT分割的权威分类树、颜色、maskid 和层级定义。

- 演示恢复出厂设置时强制恢复系统默认模板,缺失模板会重建,已修改或删减的默认语义分类树会覆盖回默认状态。

- 清理 main.py 中重复的默认模板定义,让启动 seed 复用同一套服务逻辑,避免后续默认模板定义漂移。

- 扩展管理员恢复出厂设置测试,覆盖头颈部CT模板被改坏和腹腔镜模板缺失后的恢复结果。

- 更新 AGENTS、README 和需求/API/测试/前端审计文档,明确恢复出厂设置会权威恢复系统默认模板。
2026-05-03 17:54:19 +08:00

288 lines
10 KiB
Python

"""Administrator-only user and audit management endpoints."""
import os
from typing import List
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session
from config import settings
from database import get_db
from models import Annotation, AuditLog, Frame, Mask, ProcessingTask, Project, Template, User
from routers.auth import ensure_default_admin, hash_password, require_admin, write_audit_log
from schemas import (
AdminUserCreate,
AdminUserUpdate,
AuditLogOut,
DemoFactoryResetOut,
DemoFactoryResetRequest,
UserOut,
)
from services.demo_media import (
DEMO_DICOM_PROJECT_NAME,
DEMO_VIDEO_PROJECT_NAME,
create_parsed_dicom_demo_project,
create_unparsed_video_demo_project,
demo_dicom_files,
)
from services.default_templates import restore_default_templates
router = APIRouter(prefix="/api/admin", tags=["Admin"])
VALID_ROLES = {"admin", "annotator", "viewer"}
DEMO_RESET_CONFIRMATION = "RESET_DEMO_FACTORY"
DEMO_PROJECT_NAME = DEMO_DICOM_PROJECT_NAME
def _normalize_role(role: str | None) -> str:
normalized = (role or "annotator").strip().lower()
if normalized not in VALID_ROLES:
raise HTTPException(status_code=400, detail=f"Unsupported role: {role}")
return normalized
@router.get("/users", response_model=List[UserOut], summary="List users")
def list_users(
db: Session = Depends(get_db),
admin_user: User = Depends(require_admin),
) -> List[User]:
"""Return all users for the administrator console."""
_ = admin_user
return db.query(User).order_by(User.id).all()
@router.post(
"/users",
response_model=UserOut,
status_code=status.HTTP_201_CREATED,
summary="Create user",
)
def create_user(
payload: AdminUserCreate,
db: Session = Depends(get_db),
admin_user: User = Depends(require_admin),
) -> User:
"""Create a user with an initial password and role."""
username = payload.username.strip()
if not username:
raise HTTPException(status_code=400, detail="Username is required")
if len(payload.password) < 6:
raise HTTPException(status_code=400, detail="Password must be at least 6 characters")
user = User(
username=username,
password_hash=hash_password(payload.password),
role=_normalize_role(payload.role),
is_active=1 if payload.is_active else 0,
)
db.add(user)
try:
db.commit()
except IntegrityError as exc:
db.rollback()
raise HTTPException(status_code=409, detail="Username already exists") from exc
db.refresh(user)
write_audit_log(
db,
actor=admin_user,
action="admin.user_created",
target_type="user",
target_id=user.id,
detail={"username": user.username, "role": user.role, "is_active": bool(user.is_active)},
)
return user
@router.patch("/users/{user_id}", response_model=UserOut, summary="Update user")
def update_user(
user_id: int,
payload: AdminUserUpdate,
db: Session = Depends(get_db),
admin_user: User = Depends(require_admin),
) -> User:
"""Update username, password, role or active state."""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
updates = payload.model_dump(exclude_unset=True)
audit_detail: dict = {"before": {"username": user.username, "role": user.role, "is_active": bool(user.is_active)}}
if "username" in updates:
username = (updates["username"] or "").strip()
if not username:
raise HTTPException(status_code=400, detail="Username is required")
user.username = username
if "password" in updates:
password = updates["password"] or ""
if len(password) < 6:
raise HTTPException(status_code=400, detail="Password must be at least 6 characters")
user.password_hash = hash_password(password)
if "role" in updates:
next_role = _normalize_role(updates["role"])
if user.id == admin_user.id and next_role != "admin":
raise HTTPException(status_code=400, detail="Cannot remove your own admin role")
user.role = next_role
if "is_active" in updates:
if user.id == admin_user.id and not updates["is_active"]:
raise HTTPException(status_code=400, detail="Cannot deactivate yourself")
user.is_active = 1 if updates["is_active"] else 0
try:
db.commit()
except IntegrityError as exc:
db.rollback()
raise HTTPException(status_code=409, detail="Username already exists") from exc
db.refresh(user)
audit_detail["after"] = {"username": user.username, "role": user.role, "is_active": bool(user.is_active)}
audit_detail["password_changed"] = "password" in updates
write_audit_log(
db,
actor=admin_user,
action="admin.user_updated",
target_type="user",
target_id=user.id,
detail=audit_detail,
)
return user
@router.delete("/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT, summary="Delete user")
def delete_user(
user_id: int,
db: Session = Depends(get_db),
admin_user: User = Depends(require_admin),
) -> None:
"""Delete a user when it is safe to remove the account."""
if user_id == admin_user.id:
raise HTTPException(status_code=400, detail="Cannot delete yourself")
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
owned_project_count = db.query(Project).filter(Project.owner_user_id == user_id).count()
if owned_project_count:
raise HTTPException(status_code=409, detail="User owns projects; deactivate or migrate projects first")
username = user.username
db.delete(user)
db.commit()
write_audit_log(
db,
actor=admin_user,
action="admin.user_deleted",
target_type="user",
target_id=user_id,
detail={"username": username},
)
return None
@router.get("/audit-logs", response_model=List[AuditLogOut], summary="List audit logs")
def list_audit_logs(
limit: int = 100,
db: Session = Depends(get_db),
admin_user: User = Depends(require_admin),
) -> List[AuditLog]:
"""Return recent audit events for administrators."""
_ = admin_user
safe_limit = min(max(int(limit or 100), 1), 500)
return db.query(AuditLog).order_by(AuditLog.created_at.desc(), AuditLog.id.desc()).limit(safe_limit).all()
@router.post(
"/demo-factory-reset",
response_model=DemoFactoryResetOut,
summary="Reset demo data to factory defaults",
)
def reset_demo_factory(
payload: DemoFactoryResetRequest,
db: Session = Depends(get_db),
admin_user: User = Depends(require_admin),
) -> dict:
"""Reset a demo deployment to one admin account, the demo video, and the demo DICOM project."""
if payload.confirmation != DEMO_RESET_CONFIRMATION:
raise HTTPException(status_code=400, detail="Invalid reset confirmation")
if not os.path.exists(settings.demo_video_path):
raise HTTPException(
status_code=409,
detail=f"Demo video not found: {settings.demo_video_path}",
)
if not demo_dicom_files(settings.demo_dicom_dir):
raise HTTPException(
status_code=409,
detail=f"Demo DICOM series not found: {settings.demo_dicom_dir}",
)
requested_by = admin_user.username
preserved_admin = ensure_default_admin(db)
preserved_admin.username = settings.default_admin_username
preserved_admin.password_hash = hash_password(settings.default_admin_password)
preserved_admin.role = "admin"
preserved_admin.is_active = 1
db.flush()
deleted_counts = {
"masks": db.query(Mask).delete(synchronize_session=False),
"annotations": db.query(Annotation).delete(synchronize_session=False),
"frames": db.query(Frame).delete(synchronize_session=False),
"tasks": db.query(ProcessingTask).delete(synchronize_session=False),
"projects": db.query(Project).delete(synchronize_session=False),
"user_templates": db.query(Template).filter(Template.owner_user_id.is_not(None)).delete(synchronize_session=False),
"audit_logs": db.query(AuditLog).delete(synchronize_session=False),
"users": db.query(User).filter(User.id != preserved_admin.id).delete(synchronize_session=False),
}
db.flush()
db.expunge_all()
preserved_admin = db.query(User).filter(User.username == settings.default_admin_username).first()
if not preserved_admin:
raise HTTPException(status_code=500, detail="Default admin was not preserved")
restored_templates = restore_default_templates(db)
video_project = create_unparsed_video_demo_project(
db,
owner=preserved_admin,
video_path=settings.demo_video_path,
project_name=DEMO_VIDEO_PROJECT_NAME,
)
video_project.frame_count = 0
dicom_project = create_parsed_dicom_demo_project(
db,
owner=preserved_admin,
dicom_dir=settings.demo_dicom_dir,
project_name=DEMO_PROJECT_NAME,
)
db.refresh(preserved_admin)
db.refresh(video_project)
db.refresh(dicom_project)
video_project.frame_count = len(video_project.frames)
dicom_project.frame_count = len(dicom_project.frames)
projects = [video_project, dicom_project]
write_audit_log(
db,
actor=preserved_admin,
action="admin.demo_factory_reset",
target_type="project",
target_id=dicom_project.id,
detail={
"project_names": [project.name for project in projects],
"video_path": video_project.video_path,
"dicom_path": dicom_project.video_path,
"source_types": [project.source_type for project in projects],
"frame_counts": {project.name: len(project.frames) for project in projects},
"deleted_counts": deleted_counts,
"restored_templates": [template.name for template in restored_templates],
"requested_by": requested_by,
},
)
return {
"admin_user": preserved_admin,
"project": dicom_project,
"projects": projects,
"deleted_counts": deleted_counts,
"message": "演示环境已恢复出厂设置",
}