"""Annotation export endpoints (COCO, PNG masks).""" import io import json import logging import os import zipfile from datetime import datetime from typing import Any, Dict, List import numpy as np from fastapi import APIRouter, Depends, HTTPException, status from fastapi.responses import StreamingResponse from sqlalchemy.orm import Session from database import get_db from models import Project, Annotation, Frame, Template logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/export", tags=["Export"]) def _mask_from_polygon( polygon: List[List[float]], width: int, height: int, ) -> np.ndarray: """Render a normalized polygon to a binary mask.""" import cv2 pts = np.array( [[int(p[0] * width), int(p[1] * height)] for p in polygon], dtype=np.int32, ) mask = np.zeros((height, width), dtype=np.uint8) cv2.fillPoly(mask, [pts], 255) return mask def _annotation_z_index(annotation: Annotation) -> int: class_meta = (annotation.mask_data or {}).get("class") or {} if isinstance(class_meta, dict) and class_meta.get("zIndex") is not None: try: return int(class_meta["zIndex"]) except (TypeError, ValueError): pass if annotation.template and annotation.template.z_index is not None: return int(annotation.template.z_index) return 0 def _annotation_class_key(annotation: Annotation) -> str: class_meta = (annotation.mask_data or {}).get("class") or {} if isinstance(class_meta, dict): if class_meta.get("id"): return f"class:{class_meta['id']}" if class_meta.get("name"): return f"name:{class_meta['name']}" if annotation.template_id: return f"template:{annotation.template_id}" return f"annotation:{annotation.id}" def _annotation_label(annotation: Annotation) -> str: mask_data = annotation.mask_data or {} class_meta = mask_data.get("class") or {} if isinstance(class_meta, dict) and class_meta.get("name"): return str(class_meta["name"]) if mask_data.get("label"): return str(mask_data["label"]) if annotation.template and annotation.template.name: return str(annotation.template.name) return f"Annotation {annotation.id}" def _annotation_color(annotation: Annotation) -> str: mask_data = annotation.mask_data or {} class_meta = mask_data.get("class") or {} if isinstance(class_meta, dict) and class_meta.get("color"): return str(class_meta["color"]) if mask_data.get("color"): return str(mask_data["color"]) if annotation.template and annotation.template.color: return str(annotation.template.color) return "#ffffff" @router.get( "/{project_id}/coco", summary="Export annotations in COCO format", ) def export_coco(project_id: int, db: Session = Depends(get_db)) -> StreamingResponse: """Export all annotations for a project as a COCO-format JSON file.""" project = db.query(Project).filter(Project.id == project_id).first() if not project: raise HTTPException(status_code=404, detail="Project not found") annotations = ( db.query(Annotation) .filter(Annotation.project_id == project_id) .all() ) frames = ( db.query(Frame) .filter(Frame.project_id == project_id) .order_by(Frame.frame_index) .all() ) templates = db.query(Template).all() # Build COCO structure images = [] for idx, frame in enumerate(frames): images.append({ "id": frame.id, "file_name": frame.image_url, "width": frame.width or 1920, "height": frame.height or 1080, "frame_index": idx, }) categories = [] template_id_to_cat_id: Dict[int, int] = {} for cat_idx, tmpl in enumerate(templates, start=1): categories.append({ "id": cat_idx, "name": tmpl.name, "color": tmpl.color, }) template_id_to_cat_id[tmpl.id] = cat_idx coco_annotations = [] ann_id = 1 for ann in annotations: if not ann.mask_data: continue polygons = ann.mask_data.get("polygons", []) if not polygons: continue # Use first polygon for bbox / area approximation first_poly = polygons[0] xs = [p[0] for p in first_poly] ys = [p[1] for p in first_poly] width = ann.frame.width if ann.frame else 1920 height = ann.frame.height if ann.frame else 1080 bbox = [ min(xs) * width, min(ys) * height, (max(xs) - min(xs)) * width, (max(ys) - min(ys)) * height, ] area = bbox[2] * bbox[3] segmentation = [] for poly in polygons: flat = [] for p in poly: flat.append(p[0] * width) flat.append(p[1] * height) segmentation.append(flat) coco_annotations.append({ "id": ann_id, "image_id": ann.frame_id, "category_id": template_id_to_cat_id.get(ann.template_id, 0), "segmentation": segmentation, "area": area, "bbox": bbox, "iscrowd": 0, }) ann_id += 1 coco = { "info": { "description": f"Annotations for {project.name}", "version": "1.0", "year": datetime.now().year, "date_created": datetime.now().isoformat(), }, "images": images, "annotations": coco_annotations, "categories": categories, } data = json.dumps(coco, ensure_ascii=False, indent=2).encode("utf-8") filename = f"project_{project_id}_coco.json" return StreamingResponse( io.BytesIO(data), media_type="application/json", headers={"Content-Disposition": f'attachment; filename="{filename}"'}, ) @router.get( "/{project_id}/masks", summary="Export PNG masks as a ZIP archive", ) def export_masks(project_id: int, db: Session = Depends(get_db)) -> StreamingResponse: """Export individual masks plus z-index fused semantic masks inside a ZIP.""" project = db.query(Project).filter(Project.id == project_id).first() if not project: raise HTTPException(status_code=404, detail="Project not found") import cv2 annotations = ( db.query(Annotation) .filter(Annotation.project_id == project_id) .all() ) frames = ( db.query(Frame) .filter(Frame.project_id == project_id) .order_by(Frame.frame_index) .all() ) class_values: dict[str, int] = {} semantic_classes: list[dict[str, Any]] = [] def class_value(annotation: Annotation) -> int: key = _annotation_class_key(annotation) if key not in class_values: value = len(class_values) + 1 class_values[key] = value semantic_classes.append({ "value": value, "key": key, "label": _annotation_label(annotation), "color": _annotation_color(annotation), "zIndex": _annotation_z_index(annotation), "template_id": annotation.template_id, }) return class_values[key] zip_buffer = io.BytesIO() with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf: frame_masks: dict[int, list[tuple[Annotation, np.ndarray]]] = {} for ann in annotations: if not ann.mask_data: continue polygons = ann.mask_data.get("polygons", []) if not polygons: continue width = ann.frame.width if ann.frame else 1920 height = ann.frame.height if ann.frame else 1080 combined = np.zeros((height, width), dtype=np.uint8) for poly in polygons: mask = _mask_from_polygon(poly, width, height) combined = np.maximum(combined, mask) _, encoded = cv2.imencode(".png", combined) fname = f"mask_{ann.id:06d}.png" zf.writestr(fname, encoded.tobytes()) if ann.frame_id is not None: frame_masks.setdefault(ann.frame_id, []).append((ann, combined)) for frame in frames: entries = frame_masks.get(frame.id, []) if not entries: continue width = frame.width or 1920 height = frame.height or 1080 semantic = np.zeros((height, width), dtype=np.uint8) for ann, mask in sorted(entries, key=lambda item: _annotation_z_index(item[0])): semantic[mask > 0] = class_value(ann) _, encoded = cv2.imencode(".png", semantic) zf.writestr(f"semantic_frame_{frame.frame_index:06d}.png", encoded.tobytes()) zf.writestr( "semantic_classes.json", json.dumps({"classes": semantic_classes}, ensure_ascii=False, indent=2).encode("utf-8"), ) zip_buffer.seek(0) filename = f"project_{project_id}_masks.zip" return StreamingResponse( zip_buffer, media_type="application/zip", headers={"Content-Disposition": f'attachment; filename="{filename}"'}, )