Files
Pre_Seg_Server/backend/routers/tasks.py
admin 523beeb446 收敛用户角色并共享项目库
- 后端限制系统只保留默认 admin 管理员,新建用户固定为标注员,并拒绝观察员或额外管理员角色。

- 将项目、帧、媒体解析、AI 标注、任务、Dashboard 和导出接口改为共享项目库访问,标注员具备同等项目管理和标注能力。

- 前端用户管理移除角色选择和观察员入口,只展示唯一管理员与标注员状态。

- 更新后端/前端测试,覆盖唯一 admin、旧 viewer 归一为标注员、用户删除和共享项目库访问。

- 同步更新 AGENTS 与 doc 文档中的角色权限、共享项目库和测试计划说明。
2026-05-04 05:20:28 +08:00

162 lines
5.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Processing task query endpoints."""
import logging
from datetime import datetime, timezone
from typing import List
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from celery_app import celery_app
from database import get_db
from models import ProcessingTask, Project, User
from progress_events import publish_task_progress_event
from routers.auth import get_current_user, require_editor
from schemas import ProcessingTaskOut
from statuses import (
PROJECT_STATUS_PARSING,
PROJECT_STATUS_PENDING,
PROJECT_STATUS_READY,
TASK_ACTIVE_STATUSES,
TASK_STATUS_CANCELLED,
TASK_STATUS_FAILED,
TASK_STATUS_QUEUED,
)
from worker_tasks import parse_project_media, propagate_project_masks
router = APIRouter(prefix="/api/tasks", tags=["Tasks"])
logger = logging.getLogger(__name__)
def _now() -> datetime:
return datetime.now(timezone.utc)
def _get_task_or_404(task_id: int, db: Session, current_user: User) -> ProcessingTask:
_ = current_user
task = (
db.query(ProcessingTask)
.outerjoin(Project, Project.id == ProcessingTask.project_id)
.filter(ProcessingTask.id == task_id)
.first()
)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
return task
def _project_status_after_stop(project: Project) -> str:
return PROJECT_STATUS_READY if project.frames else PROJECT_STATUS_PENDING
@router.get("", response_model=List[ProcessingTaskOut], summary="List processing tasks")
def list_tasks(
project_id: int | None = None,
status: str | None = None,
limit: int = 50,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> List[ProcessingTask]:
"""Return recent background processing tasks."""
_ = current_user
query = db.query(ProcessingTask).outerjoin(Project, Project.id == ProcessingTask.project_id)
if project_id is not None:
query = query.filter(ProcessingTask.project_id == project_id)
if status is not None:
query = query.filter(ProcessingTask.status == status)
return query.order_by(ProcessingTask.created_at.desc()).limit(limit).all()
@router.get("/{task_id}", response_model=ProcessingTaskOut, summary="Get processing task")
def get_task(
task_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> ProcessingTask:
"""Return one background task by id."""
return _get_task_or_404(task_id, db, current_user)
@router.post("/{task_id}/cancel", response_model=ProcessingTaskOut, summary="Cancel processing task")
def cancel_task(
task_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(require_editor),
) -> ProcessingTask:
"""Cancel a queued/running background task and revoke the Celery job when possible."""
task = _get_task_or_404(task_id, db, current_user)
if task.status not in TASK_ACTIVE_STATUSES:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Task is not cancellable in status: {task.status}",
)
if task.celery_task_id:
try:
celery_app.control.revoke(task.celery_task_id, terminate=True, signal="SIGTERM")
except Exception as exc: # noqa: BLE001
logger.warning("Failed to revoke celery task %s: %s", task.celery_task_id, exc)
task.status = TASK_STATUS_CANCELLED
task.progress = 100
task.message = "任务已取消"
task.error = "Cancelled by user"
task.finished_at = _now()
if task.project:
task.project.status = _project_status_after_stop(task.project)
db.commit()
db.refresh(task)
publish_task_progress_event(task)
return task
@router.post("/{task_id}/retry", response_model=ProcessingTaskOut, status_code=status.HTTP_202_ACCEPTED, summary="Retry processing task")
def retry_task(
task_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(require_editor),
) -> ProcessingTask:
"""Create a fresh queued task from a failed or cancelled task."""
previous = _get_task_or_404(task_id, db, current_user)
if previous.status not in {TASK_STATUS_FAILED, TASK_STATUS_CANCELLED}:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Task is not retryable in status: {previous.status}",
)
if previous.project_id is None:
raise HTTPException(status_code=400, detail="Task has no project_id")
project = db.query(Project).filter(Project.id == previous.project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
is_propagation_task = previous.task_type == "propagate_masks"
if not is_propagation_task and not project.video_path:
raise HTTPException(status_code=400, detail="Project has no media uploaded")
payload = dict(previous.payload or {})
payload.setdefault("source_type", project.source_type or "video")
payload["retry_of"] = previous.id
task = ProcessingTask(
task_type=previous.task_type,
status=TASK_STATUS_QUEUED,
progress=0,
message=f"重试任务已入队(源任务 #{previous.id}",
project_id=project.id,
payload=payload,
)
if not is_propagation_task:
project.status = PROJECT_STATUS_PARSING
db.add(task)
db.commit()
db.refresh(task)
publish_task_progress_event(task)
async_result = propagate_project_masks.delay(task.id) if is_propagation_task else parse_project_media.delay(task.id)
task.celery_task_id = async_result.id
db.commit()
db.refresh(task)
publish_task_progress_event(task)
return task