- 打通工作区真实标注闭环:支持手工多边形、矩形、圆形、点区域和线段生成 mask,并可保存、回显、更新和删除后端 annotation。 - 增强 polygon 编辑器:支持顶点拖动、顶点删除、边中点插入、多 polygon 子区域选择编辑,以及区域合并和区域去除。 - 接入 GT mask 导入:后端支持二值/多类别 mask 拆分、contour 转 polygon、distance transform seed point,前端支持导入、回显和 seed point 拖动编辑。 - 完善导出能力:COCO JSON 导出对齐前端,PNG mask ZIP 同时包含单标注 mask、按 zIndex 融合的 semantic_frame 和 semantic_classes.json。 - 打通异步任务管理:新增任务取消、重试、失败详情接口与 Dashboard 控件,worker 支持取消状态检查并通过 Redis/WebSocket 推送 cancelled 事件。 - 对接 Dashboard 后端数据:概览统计、解析队列和实时流转记录从 FastAPI 聚合接口与 WebSocket 更新。 - 增强 AI 推理参数:前端发送 crop_to_prompt、auto_filter_background 和 min_score,后端支持点/框 prompt 局部裁剪推理、结果回映射和负向点/低分过滤。 - 接入 SAM3 基础设施:新增独立 Python 3.12 sam3 环境安装脚本、外部 worker helper、后端桥接和真实 Python/CUDA/包/HF checkpoint access 状态检测。 - 保留 SAM3 授权边界:当前官方 facebook/sam3 gated 权重未授权时状态接口会返回不可用,不伪装成可推理。 - 增强前端状态管理:新增 mask undo/redo 历史栈、AI 模型选择状态、保存状态 dirty/draft/saved 流转和项目状态归一化。 - 更新前端 API 封装:补充 annotation CRUD、GT mask import、mask ZIP export、task cancel/retry/detail、AI runtime status 和 prediction options。 - 更新 UI 控件:ToolsPalette、AISegmentation、VideoWorkspace 和 CanvasArea 接入真实操作、导入导出、撤销重做、任务控制和模型状态。 - 新增 polygon-clipping 依赖,用于前端区域 union/difference 几何运算。 - 完善后端 schemas/status/progress:补充 AI 模型外部状态字段、任务 cancelled 状态和进度事件 payload。 - 补充测试覆盖:新增后端任务控制、SAM3 桥接、GT mask、导出融合、AI options 测试;补充前端 Canvas、Dashboard、VideoWorkspace、ToolsPalette、API 和 store 测试。 - 更新 README、AGENTS 和 doc 文档:冻结当前需求/设计/测试计划,标注真实功能、剩余 Mock、SAM3 授权边界和后续实施顺序。
67 lines
2.1 KiB
Python
67 lines
2.1 KiB
Python
"""Progress event payloads and Redis publication helpers."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
from typing import Any
|
|
|
|
from redis_client import get_redis_client
|
|
from statuses import TASK_STATUS_CANCELLED, TASK_STATUS_FAILED, TASK_STATUS_SUCCESS
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
PROGRESS_CHANNEL = "seg:progress"
|
|
|
|
|
|
def _iso_now() -> str:
|
|
return datetime.now(timezone.utc).isoformat()
|
|
|
|
|
|
def _event_type(task_status: str) -> str:
|
|
if task_status == TASK_STATUS_SUCCESS:
|
|
return "complete"
|
|
if task_status == TASK_STATUS_CANCELLED:
|
|
return "cancelled"
|
|
if task_status == TASK_STATUS_FAILED:
|
|
return "error"
|
|
return "progress"
|
|
|
|
|
|
def task_progress_payload(task: Any) -> dict[str, Any]:
|
|
"""Build the WebSocket payload from a persisted processing task."""
|
|
project = getattr(task, "project", None)
|
|
project_name = getattr(project, "name", None)
|
|
status = getattr(task, "status", "")
|
|
updated_at = getattr(task, "updated_at", None)
|
|
timestamp = updated_at.isoformat() if updated_at is not None else _iso_now()
|
|
message = getattr(task, "message", None)
|
|
|
|
return {
|
|
"type": _event_type(status),
|
|
"taskId": f"task-{task.id}",
|
|
"task_id": task.id,
|
|
"project_id": getattr(task, "project_id", None),
|
|
"projectName": project_name,
|
|
"filename": project_name,
|
|
"progress": getattr(task, "progress", 0),
|
|
"status": message or status,
|
|
"message": message,
|
|
"error": getattr(task, "error", None),
|
|
"timestamp": timestamp,
|
|
}
|
|
|
|
|
|
def publish_progress_event(payload: dict[str, Any]) -> None:
|
|
"""Publish a JSON progress event without failing the worker on Redis errors."""
|
|
try:
|
|
get_redis_client().publish(PROGRESS_CHANNEL, json.dumps(payload, ensure_ascii=False))
|
|
except Exception as exc: # noqa: BLE001
|
|
logger.warning("Failed to publish progress event: %s", exc)
|
|
|
|
|
|
def publish_task_progress_event(task: Any) -> None:
|
|
"""Publish a progress event for a ProcessingTask ORM object."""
|
|
publish_progress_event(task_progress_payload(task))
|