- 打通工作区真实标注闭环:支持手工多边形、矩形、圆形、点区域和线段生成 mask,并可保存、回显、更新和删除后端 annotation。 - 增强 polygon 编辑器:支持顶点拖动、顶点删除、边中点插入、多 polygon 子区域选择编辑,以及区域合并和区域去除。 - 接入 GT mask 导入:后端支持二值/多类别 mask 拆分、contour 转 polygon、distance transform seed point,前端支持导入、回显和 seed point 拖动编辑。 - 完善导出能力:COCO JSON 导出对齐前端,PNG mask ZIP 同时包含单标注 mask、按 zIndex 融合的 semantic_frame 和 semantic_classes.json。 - 打通异步任务管理:新增任务取消、重试、失败详情接口与 Dashboard 控件,worker 支持取消状态检查并通过 Redis/WebSocket 推送 cancelled 事件。 - 对接 Dashboard 后端数据:概览统计、解析队列和实时流转记录从 FastAPI 聚合接口与 WebSocket 更新。 - 增强 AI 推理参数:前端发送 crop_to_prompt、auto_filter_background 和 min_score,后端支持点/框 prompt 局部裁剪推理、结果回映射和负向点/低分过滤。 - 接入 SAM3 基础设施:新增独立 Python 3.12 sam3 环境安装脚本、外部 worker helper、后端桥接和真实 Python/CUDA/包/HF checkpoint access 状态检测。 - 保留 SAM3 授权边界:当前官方 facebook/sam3 gated 权重未授权时状态接口会返回不可用,不伪装成可推理。 - 增强前端状态管理:新增 mask undo/redo 历史栈、AI 模型选择状态、保存状态 dirty/draft/saved 流转和项目状态归一化。 - 更新前端 API 封装:补充 annotation CRUD、GT mask import、mask ZIP export、task cancel/retry/detail、AI runtime status 和 prediction options。 - 更新 UI 控件:ToolsPalette、AISegmentation、VideoWorkspace 和 CanvasArea 接入真实操作、导入导出、撤销重做、任务控制和模型状态。 - 新增 polygon-clipping 依赖,用于前端区域 union/difference 几何运算。 - 完善后端 schemas/status/progress:补充 AI 模型外部状态字段、任务 cancelled 状态和进度事件 payload。 - 补充测试覆盖:新增后端任务控制、SAM3 桥接、GT mask、导出融合、AI options 测试;补充前端 Canvas、Dashboard、VideoWorkspace、ToolsPalette、API 和 store 测试。 - 更新 README、AGENTS 和 doc 文档:冻结当前需求/设计/测试计划,标注真实功能、剩余 Mock、SAM3 授权边界和后续实施顺序。
287 lines
9.1 KiB
Python
287 lines
9.1 KiB
Python
"""Annotation export endpoints (COCO, PNG masks)."""
|
|
|
|
import io
|
|
import json
|
|
import logging
|
|
import os
|
|
import zipfile
|
|
from datetime import datetime
|
|
from typing import Any, Dict, List
|
|
|
|
import numpy as np
|
|
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from fastapi.responses import StreamingResponse
|
|
from sqlalchemy.orm import Session
|
|
|
|
from database import get_db
|
|
from models import Project, Annotation, Frame, Template
|
|
|
|
logger = logging.getLogger(__name__)
|
|
router = APIRouter(prefix="/api/export", tags=["Export"])
|
|
|
|
|
|
def _mask_from_polygon(
|
|
polygon: List[List[float]],
|
|
width: int,
|
|
height: int,
|
|
) -> np.ndarray:
|
|
"""Render a normalized polygon to a binary mask."""
|
|
import cv2
|
|
|
|
pts = np.array(
|
|
[[int(p[0] * width), int(p[1] * height)] for p in polygon],
|
|
dtype=np.int32,
|
|
)
|
|
mask = np.zeros((height, width), dtype=np.uint8)
|
|
cv2.fillPoly(mask, [pts], 255)
|
|
return mask
|
|
|
|
|
|
def _annotation_z_index(annotation: Annotation) -> int:
|
|
class_meta = (annotation.mask_data or {}).get("class") or {}
|
|
if isinstance(class_meta, dict) and class_meta.get("zIndex") is not None:
|
|
try:
|
|
return int(class_meta["zIndex"])
|
|
except (TypeError, ValueError):
|
|
pass
|
|
if annotation.template and annotation.template.z_index is not None:
|
|
return int(annotation.template.z_index)
|
|
return 0
|
|
|
|
|
|
def _annotation_class_key(annotation: Annotation) -> str:
|
|
class_meta = (annotation.mask_data or {}).get("class") or {}
|
|
if isinstance(class_meta, dict):
|
|
if class_meta.get("id"):
|
|
return f"class:{class_meta['id']}"
|
|
if class_meta.get("name"):
|
|
return f"name:{class_meta['name']}"
|
|
if annotation.template_id:
|
|
return f"template:{annotation.template_id}"
|
|
return f"annotation:{annotation.id}"
|
|
|
|
|
|
def _annotation_label(annotation: Annotation) -> str:
|
|
mask_data = annotation.mask_data or {}
|
|
class_meta = mask_data.get("class") or {}
|
|
if isinstance(class_meta, dict) and class_meta.get("name"):
|
|
return str(class_meta["name"])
|
|
if mask_data.get("label"):
|
|
return str(mask_data["label"])
|
|
if annotation.template and annotation.template.name:
|
|
return str(annotation.template.name)
|
|
return f"Annotation {annotation.id}"
|
|
|
|
|
|
def _annotation_color(annotation: Annotation) -> str:
|
|
mask_data = annotation.mask_data or {}
|
|
class_meta = mask_data.get("class") or {}
|
|
if isinstance(class_meta, dict) and class_meta.get("color"):
|
|
return str(class_meta["color"])
|
|
if mask_data.get("color"):
|
|
return str(mask_data["color"])
|
|
if annotation.template and annotation.template.color:
|
|
return str(annotation.template.color)
|
|
return "#ffffff"
|
|
|
|
|
|
@router.get(
|
|
"/{project_id}/coco",
|
|
summary="Export annotations in COCO format",
|
|
)
|
|
def export_coco(project_id: int, db: Session = Depends(get_db)) -> StreamingResponse:
|
|
"""Export all annotations for a project as a COCO-format JSON file."""
|
|
project = db.query(Project).filter(Project.id == project_id).first()
|
|
if not project:
|
|
raise HTTPException(status_code=404, detail="Project not found")
|
|
|
|
annotations = (
|
|
db.query(Annotation)
|
|
.filter(Annotation.project_id == project_id)
|
|
.all()
|
|
)
|
|
frames = (
|
|
db.query(Frame)
|
|
.filter(Frame.project_id == project_id)
|
|
.order_by(Frame.frame_index)
|
|
.all()
|
|
)
|
|
templates = db.query(Template).all()
|
|
|
|
# Build COCO structure
|
|
images = []
|
|
for idx, frame in enumerate(frames):
|
|
images.append({
|
|
"id": frame.id,
|
|
"file_name": frame.image_url,
|
|
"width": frame.width or 1920,
|
|
"height": frame.height or 1080,
|
|
"frame_index": idx,
|
|
})
|
|
|
|
categories = []
|
|
template_id_to_cat_id: Dict[int, int] = {}
|
|
for cat_idx, tmpl in enumerate(templates, start=1):
|
|
categories.append({
|
|
"id": cat_idx,
|
|
"name": tmpl.name,
|
|
"color": tmpl.color,
|
|
})
|
|
template_id_to_cat_id[tmpl.id] = cat_idx
|
|
|
|
coco_annotations = []
|
|
ann_id = 1
|
|
for ann in annotations:
|
|
if not ann.mask_data:
|
|
continue
|
|
polygons = ann.mask_data.get("polygons", [])
|
|
if not polygons:
|
|
continue
|
|
|
|
# Use first polygon for bbox / area approximation
|
|
first_poly = polygons[0]
|
|
xs = [p[0] for p in first_poly]
|
|
ys = [p[1] for p in first_poly]
|
|
width = ann.frame.width if ann.frame else 1920
|
|
height = ann.frame.height if ann.frame else 1080
|
|
bbox = [
|
|
min(xs) * width,
|
|
min(ys) * height,
|
|
(max(xs) - min(xs)) * width,
|
|
(max(ys) - min(ys)) * height,
|
|
]
|
|
area = bbox[2] * bbox[3]
|
|
|
|
segmentation = []
|
|
for poly in polygons:
|
|
flat = []
|
|
for p in poly:
|
|
flat.append(p[0] * width)
|
|
flat.append(p[1] * height)
|
|
segmentation.append(flat)
|
|
|
|
coco_annotations.append({
|
|
"id": ann_id,
|
|
"image_id": ann.frame_id,
|
|
"category_id": template_id_to_cat_id.get(ann.template_id, 0),
|
|
"segmentation": segmentation,
|
|
"area": area,
|
|
"bbox": bbox,
|
|
"iscrowd": 0,
|
|
})
|
|
ann_id += 1
|
|
|
|
coco = {
|
|
"info": {
|
|
"description": f"Annotations for {project.name}",
|
|
"version": "1.0",
|
|
"year": datetime.now().year,
|
|
"date_created": datetime.now().isoformat(),
|
|
},
|
|
"images": images,
|
|
"annotations": coco_annotations,
|
|
"categories": categories,
|
|
}
|
|
|
|
data = json.dumps(coco, ensure_ascii=False, indent=2).encode("utf-8")
|
|
filename = f"project_{project_id}_coco.json"
|
|
|
|
return StreamingResponse(
|
|
io.BytesIO(data),
|
|
media_type="application/json",
|
|
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
|
|
)
|
|
|
|
|
|
@router.get(
|
|
"/{project_id}/masks",
|
|
summary="Export PNG masks as a ZIP archive",
|
|
)
|
|
def export_masks(project_id: int, db: Session = Depends(get_db)) -> StreamingResponse:
|
|
"""Export individual masks plus z-index fused semantic masks inside a ZIP."""
|
|
project = db.query(Project).filter(Project.id == project_id).first()
|
|
if not project:
|
|
raise HTTPException(status_code=404, detail="Project not found")
|
|
|
|
import cv2
|
|
|
|
annotations = (
|
|
db.query(Annotation)
|
|
.filter(Annotation.project_id == project_id)
|
|
.all()
|
|
)
|
|
frames = (
|
|
db.query(Frame)
|
|
.filter(Frame.project_id == project_id)
|
|
.order_by(Frame.frame_index)
|
|
.all()
|
|
)
|
|
|
|
class_values: dict[str, int] = {}
|
|
semantic_classes: list[dict[str, Any]] = []
|
|
|
|
def class_value(annotation: Annotation) -> int:
|
|
key = _annotation_class_key(annotation)
|
|
if key not in class_values:
|
|
value = len(class_values) + 1
|
|
class_values[key] = value
|
|
semantic_classes.append({
|
|
"value": value,
|
|
"key": key,
|
|
"label": _annotation_label(annotation),
|
|
"color": _annotation_color(annotation),
|
|
"zIndex": _annotation_z_index(annotation),
|
|
"template_id": annotation.template_id,
|
|
})
|
|
return class_values[key]
|
|
|
|
zip_buffer = io.BytesIO()
|
|
with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
|
|
frame_masks: dict[int, list[tuple[Annotation, np.ndarray]]] = {}
|
|
for ann in annotations:
|
|
if not ann.mask_data:
|
|
continue
|
|
polygons = ann.mask_data.get("polygons", [])
|
|
if not polygons:
|
|
continue
|
|
|
|
width = ann.frame.width if ann.frame else 1920
|
|
height = ann.frame.height if ann.frame else 1080
|
|
combined = np.zeros((height, width), dtype=np.uint8)
|
|
|
|
for poly in polygons:
|
|
mask = _mask_from_polygon(poly, width, height)
|
|
combined = np.maximum(combined, mask)
|
|
|
|
_, encoded = cv2.imencode(".png", combined)
|
|
fname = f"mask_{ann.id:06d}.png"
|
|
zf.writestr(fname, encoded.tobytes())
|
|
if ann.frame_id is not None:
|
|
frame_masks.setdefault(ann.frame_id, []).append((ann, combined))
|
|
|
|
for frame in frames:
|
|
entries = frame_masks.get(frame.id, [])
|
|
if not entries:
|
|
continue
|
|
width = frame.width or 1920
|
|
height = frame.height or 1080
|
|
semantic = np.zeros((height, width), dtype=np.uint8)
|
|
for ann, mask in sorted(entries, key=lambda item: _annotation_z_index(item[0])):
|
|
semantic[mask > 0] = class_value(ann)
|
|
_, encoded = cv2.imencode(".png", semantic)
|
|
zf.writestr(f"semantic_frame_{frame.frame_index:06d}.png", encoded.tobytes())
|
|
|
|
zf.writestr(
|
|
"semantic_classes.json",
|
|
json.dumps({"classes": semantic_classes}, ensure_ascii=False, indent=2).encode("utf-8"),
|
|
)
|
|
|
|
zip_buffer.seek(0)
|
|
filename = f"project_{project_id}_masks.zip"
|
|
|
|
return StreamingResponse(
|
|
zip_buffer,
|
|
media_type="application/zip",
|
|
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
|
|
)
|