import zipfile import json from io import BytesIO from urllib.parse import unquote import cv2 import numpy as np def _fake_image_bytes(width=100, height=50, color=(255, 255, 255)): image = np.full((height, width, 3), color, dtype=np.uint8) _, encoded = cv2.imencode(".jpg", image) return encoded.tobytes() def _seed_export_data(client): project = client.post("/api/projects", json={ "name": "Export Project", "video_path": "uploads/1/clip.mp4", }).json() frame = client.post(f"/api/projects/{project['id']}/frames", json={ "project_id": project["id"], "frame_index": 0, "image_url": "frames/0.jpg", "width": 100, "height": 50, "timestamp_ms": 1250.0, "source_frame_number": 37, }).json() template = client.post("/api/templates", json={ "name": "Category", "color": "#06b6d4", "z_index": 0, "classes": [], "rules": [], }).json() annotation = client.post("/api/ai/annotate", json={ "project_id": project["id"], "frame_id": frame["id"], "template_id": template["id"], "mask_data": {"polygons": [[[0.1, 0.2], [0.9, 0.2], [0.9, 0.8], [0.1, 0.8]]]}, "points": [[0.5, 0.5]], "bbox": [0.1, 0.2, 0.8, 0.6], }).json() return project, frame, template, annotation def test_export_coco_json_structure(client): project, frame, _, _ = _seed_export_data(client) response = client.get(f"/api/export/{project['id']}/coco") assert response.status_code == 200 assert response.headers["content-type"].startswith("application/json") data = response.json() assert data["info"]["description"] == "Annotations for Export Project" assert data["images"][0] == { "id": frame["id"], "file_name": "frames/0.jpg", "width": 100, "height": 50, "frame_index": 0, } assert data["annotations"][0]["segmentation"] == [[10.0, 10.0, 90.0, 10.0, 90.0, 40.0, 10.0, 40.0]] assert data["annotations"][0]["bbox"] == [10.0, 10.0, 80.0, 30.000000000000004] assert data["categories"][0]["name"] == "Category" def test_export_masks_zip(client): project, _, _, annotation = _seed_export_data(client) response = client.get(f"/api/export/{project['id']}/masks") assert response.status_code == 200 assert response.headers["content-type"].startswith("application/zip") with zipfile.ZipFile(BytesIO(response.content)) as archive: assert archive.namelist() == [ f"mask_{annotation['id']:06d}.png", "semantic_frame_000000.png", "semantic_classes.json", ] def test_export_masks_uses_z_index_for_semantic_fusion(client): project = client.post("/api/projects", json={"name": "Fusion Project"}).json() frame = client.post(f"/api/projects/{project['id']}/frames", json={ "project_id": project["id"], "frame_index": 0, "image_url": "frames/0.jpg", "width": 20, "height": 20, }).json() low = client.post("/api/ai/annotate", json={ "project_id": project["id"], "frame_id": frame["id"], "mask_data": { "polygons": [[[0.1, 0.1], [0.8, 0.1], [0.8, 0.8], [0.1, 0.8]]], "label": "Low", "color": "#00ff00", "class": {"id": "low", "name": "Low", "color": "#00ff00", "zIndex": 10}, }, }).json() high = client.post("/api/ai/annotate", json={ "project_id": project["id"], "frame_id": frame["id"], "mask_data": { "polygons": [[[0.4, 0.4], [0.9, 0.4], [0.9, 0.9], [0.4, 0.9]]], "label": "High", "color": "#ff0000", "class": {"id": "high", "name": "High", "color": "#ff0000", "zIndex": 20}, }, }).json() response = client.get(f"/api/export/{project['id']}/masks") assert response.status_code == 200 with zipfile.ZipFile(BytesIO(response.content)) as archive: assert f"mask_{low['id']:06d}.png" in archive.namelist() assert f"mask_{high['id']:06d}.png" in archive.namelist() legend = json.loads(archive.read("semantic_classes.json")) high_value = next(item["value"] for item in legend["classes"] if item["key"] == "class:high") semantic_bytes = np.frombuffer(archive.read("semantic_frame_000000.png"), dtype=np.uint8) semantic = cv2.imdecode(semantic_bytes, cv2.IMREAD_GRAYSCALE) assert semantic[10, 10] == high_value def test_export_results_zip_contains_coco_original_images_and_selected_mask_outputs(client, monkeypatch): project, _, _, annotation = _seed_export_data(client) monkeypatch.setattr("routers.export.download_file", lambda object_name: _fake_image_bytes()) response = client.get(f"/api/export/{project['id']}/results?scope=all&mask_type=both") assert response.status_code == 200 assert response.headers["content-type"].startswith("application/zip") with zipfile.ZipFile(BytesIO(response.content)) as archive: names = archive.namelist() frame_stem = "clip_0h00m01s250ms_frame000001" assert "annotations_coco.json" in names assert "maskid_GT像素值_类别映射.json" in names assert f"原始图片/{frame_stem}.jpg" in names assert f"分开Mask分割结果/{frame_stem}_分别导出/{frame_stem}_Category_maskid1.png" in names assert f"GT_label图/{frame_stem}.png" in names assert f"Pro_label彩色分割结果/{frame_stem}.png" in names assert f"Mix_label重叠覆盖彩色分割结果/{frame_stem}.png" in names coco = json.loads(archive.read("annotations_coco.json")) mapping = json.loads(archive.read("maskid_GT像素值_类别映射.json")) label_bytes = np.frombuffer(archive.read(f"GT_label图/{frame_stem}.png"), dtype=np.uint8) gt_label = cv2.imdecode(label_bytes, cv2.IMREAD_UNCHANGED) pro_label = cv2.imdecode( np.frombuffer(archive.read(f"Pro_label彩色分割结果/{frame_stem}.png"), dtype=np.uint8), cv2.IMREAD_COLOR, ) mix_label = cv2.imdecode( np.frombuffer(archive.read(f"Mix_label重叠覆盖彩色分割结果/{frame_stem}.png"), dtype=np.uint8), cv2.IMREAD_COLOR, ) assert coco["images"][0]["frame_index"] == 0 assert coco["annotations"][0]["image_id"] == annotation["frame_id"] assert mapping["classes"] == [{ "gt_pixel_value": 1, "maskid": 1, "chineseName": "Category", "className": "Category", "categoryName": "Category", "rgb": [6, 182, 212], "color": "#06b6d4", "key": f"template:{annotation['template_id']}", "template_id": annotation["template_id"], }] assert gt_label.dtype == np.uint8 assert gt_label[0, 0] == 0 assert gt_label[20, 50] == 1 assert pro_label[20, 50].tolist() == [212, 182, 6] assert pro_label[0, 0].tolist() == [0, 0, 0] assert mix_label[20, 50].tolist() != [255, 255, 255] def test_export_results_uses_internal_layer_order_for_gt_pro_and_mix_outputs(client, monkeypatch): monkeypatch.setattr("routers.export.download_file", lambda object_name: _fake_image_bytes(20, 20)) project = client.post("/api/projects", json={ "name": "Layered Export Project", "video_path": "uploads/2/layered.mp4", }).json() frame = client.post(f"/api/projects/{project['id']}/frames", json={ "project_id": project["id"], "frame_index": 0, "image_url": "frames/layered.jpg", "width": 20, "height": 20, "timestamp_ms": 0, "source_frame_number": 0, }).json() client.post("/api/ai/annotate", json={ "project_id": project["id"], "frame_id": frame["id"], "mask_data": { "polygons": [[[0.1, 0.1], [0.8, 0.1], [0.8, 0.8], [0.1, 0.8]]], "label": "Low", "color": "#00ff00", "class": {"id": "low", "name": "Low", "color": "#00ff00", "zIndex": 10}, }, }) client.post("/api/ai/annotate", json={ "project_id": project["id"], "frame_id": frame["id"], "mask_data": { "polygons": [[[0.4, 0.4], [0.9, 0.4], [0.9, 0.9], [0.4, 0.9]]], "label": "High", "color": "#ff0000", "class": {"id": "high", "name": "High", "color": "#ff0000", "zIndex": 20}, }, }) response = client.get( f"/api/export/{project['id']}/results?scope=all&outputs=gt_label,pro_label,mix_label&mix_opacity=0.5", ) assert response.status_code == 200 with zipfile.ZipFile(BytesIO(response.content)) as archive: mapping = json.loads(archive.read("maskid_GT像素值_类别映射.json")) high_value = next(item["maskid"] for item in mapping["classes"] if item["key"] == "class:high") stem = "layered_0h00m00s000ms_frame000001" gt_label = cv2.imdecode( np.frombuffer(archive.read(f"GT_label图/{stem}.png"), dtype=np.uint8), cv2.IMREAD_UNCHANGED, ) pro_label = cv2.imdecode( np.frombuffer(archive.read(f"Pro_label彩色分割结果/{stem}.png"), dtype=np.uint8), cv2.IMREAD_COLOR, ) mix_label = cv2.imdecode( np.frombuffer(archive.read(f"Mix_label重叠覆盖彩色分割结果/{stem}.png"), dtype=np.uint8), cv2.IMREAD_COLOR, ) assert gt_label.dtype == np.uint8 assert gt_label[10, 10] == high_value assert pro_label[10, 10].tolist() == [0, 0, 255] assert mix_label[10, 10].tolist() == [127, 127, 255] def test_export_results_supports_range_and_current_scope(client, monkeypatch): monkeypatch.setattr("routers.export.download_file", lambda object_name: _fake_image_bytes(20, 20)) project = client.post("/api/projects", json={ "name": "Scoped Export Project", "video_path": "uploads/9/scope.mp4", "parse_fps": 2, }).json() template = client.post("/api/templates", json={ "name": "Scoped Category", "color": "#06b6d4", "z_index": 0, "classes": [], "rules": [], }).json() frames = [] annotations = [] for idx in range(3): frame = client.post(f"/api/projects/{project['id']}/frames", json={ "project_id": project["id"], "frame_index": idx, "image_url": f"frames/{idx}.jpg", "width": 20, "height": 20, "timestamp_ms": idx * 500.0, "source_frame_number": idx * 10, }).json() frames.append(frame) annotations.append(client.post("/api/ai/annotate", json={ "project_id": project["id"], "frame_id": frame["id"], "template_id": template["id"], "mask_data": {"polygons": [[[0.1, 0.1], [0.8, 0.1], [0.8, 0.8], [0.1, 0.8]]]}, }).json()) range_response = client.get( f"/api/export/{project['id']}/results?scope=range&start_frame=2&end_frame=3&mask_type=gt_label", ) current_response = client.get( f"/api/export/{project['id']}/results?scope=current&frame_id={frames[1]['id']}&mask_type=separate", ) assert range_response.status_code == 200 assert "Scoped_Export_Project_seg_T_0h00m00s500ms-0h00m01s000ms_P_2-3.zip" in unquote( range_response.headers["content-disposition"], ) with zipfile.ZipFile(BytesIO(range_response.content)) as archive: names = archive.namelist() coco = json.loads(archive.read("annotations_coco.json")) assert "原始图片/scope_0h00m00s500ms_frame000002.jpg" in names assert "原始图片/scope_0h00m01s000ms_frame000003.jpg" in names assert "原始图片/scope_0h00m00s000ms_frame000001.jpg" not in names assert "GT_label图/scope_0h00m00s500ms_frame000002.png" in names assert "GT_label图/scope_0h00m01s000ms_frame000003.png" in names assert "GT_label图/scope_0h00m00s000ms_frame000001.png" not in names assert not any(name.startswith("分开Mask分割结果/") for name in names) assert not any(name.startswith("Pro_label彩色分割结果/") for name in names) assert not any(name.startswith("Mix_label重叠覆盖彩色分割结果/") for name in names) assert [image["frame_index"] for image in coco["images"]] == [1, 2] assert current_response.status_code == 200 with zipfile.ZipFile(BytesIO(current_response.content)) as archive: names = archive.namelist() coco = json.loads(archive.read("annotations_coco.json")) current_stem = "scope_0h00m00s500ms_frame000002" assert f"原始图片/{current_stem}.jpg" in names assert f"分开Mask分割结果/{current_stem}_分别导出/{current_stem}_Scoped_Category_maskid1.png" in names assert f"分开Mask分割结果/scope_0h00m00s000ms_frame000001_分别导出/scope_0h00m00s000ms_frame000001_Scoped_Category_maskid1.png" not in names assert not any(name.startswith("GT_label图/") for name in names) assert not any(name.startswith("Pro_label彩色分割结果/") for name in names) assert not any(name.startswith("Mix_label重叠覆盖彩色分割结果/") for name in names) assert [image["id"] for image in coco["images"]] == [frames[1]["id"]] def test_export_results_preserves_template_maskid_consistently_across_frames(client, monkeypatch): monkeypatch.setattr("routers.export.download_file", lambda object_name: _fake_image_bytes(20, 20)) project = client.post("/api/projects", json={ "name": "MaskId Export Project", "video_path": "uploads/8/maskid-demo.mp4", "parse_fps": 1, }).json() frames = [] for idx in range(2): frames.append(client.post(f"/api/projects/{project['id']}/frames", json={ "project_id": project["id"], "frame_index": idx, "image_url": f"frames/{idx}.jpg", "width": 20, "height": 20, "timestamp_ms": idx * 1000.0, "source_frame_number": idx, }).json()) client.post("/api/ai/annotate", json={ "project_id": project["id"], "frame_id": frames[-1]["id"], "mask_data": { "polygons": [[[0.1, 0.1], [0.8, 0.1], [0.8, 0.8], [0.1, 0.8]]], "label": "Tumor", "color": "#ff0000", "class": {"id": "tumor", "name": "Tumor", "color": "#ff0000", "maskId": 7, "zIndex": 30}, }, }) response = client.get(f"/api/export/{project['id']}/results?scope=all&mask_type=both") assert response.status_code == 200 with zipfile.ZipFile(BytesIO(response.content)) as archive: names = archive.namelist() mapping = json.loads(archive.read("maskid_GT像素值_类别映射.json")) first_stem = "maskid-demo_0h00m00s000ms_frame000001" second_stem = "maskid-demo_0h00m01s000ms_frame000002" assert f"分开Mask分割结果/{first_stem}_分别导出/{first_stem}_Tumor_maskid7.png" in names assert f"分开Mask分割结果/{second_stem}_分别导出/{second_stem}_Tumor_maskid7.png" in names first_label = cv2.imdecode(np.frombuffer(archive.read(f"GT_label图/{first_stem}.png"), dtype=np.uint8), cv2.IMREAD_UNCHANGED) second_label = cv2.imdecode(np.frombuffer(archive.read(f"GT_label图/{second_stem}.png"), dtype=np.uint8), cv2.IMREAD_UNCHANGED) assert mapping["classes"] == [{ "gt_pixel_value": 7, "maskid": 7, "chineseName": "Tumor", "className": "Tumor", "categoryName": "", "rgb": [255, 0, 0], "color": "#ff0000", "key": "class:tumor", "template_id": None, }] assert first_label.dtype == np.uint8 assert second_label.dtype == np.uint8 assert first_label[5, 5] == 7 assert second_label[5, 5] == 7 def test_export_results_keeps_unclassified_maskid_zero_black_in_gt_and_pro(client, monkeypatch): monkeypatch.setattr("routers.export.download_file", lambda object_name: _fake_image_bytes(20, 20)) project = client.post("/api/projects", json={ "name": "Unclassified Export Project", "video_path": "uploads/8/unclassified.mp4", }).json() frame = client.post(f"/api/projects/{project['id']}/frames", json={ "project_id": project["id"], "frame_index": 0, "image_url": "frames/source.jpg", "width": 20, "height": 20, "timestamp_ms": 0, }).json() client.post("/api/ai/annotate", json={ "project_id": project["id"], "frame_id": frame["id"], "mask_data": { "polygons": [[[0.1, 0.1], [0.8, 0.1], [0.8, 0.8], [0.1, 0.8]]], "label": "待分类", "color": "#000000", "class": { "id": "reserved-unclassified", "name": "待分类", "color": "#000000", "maskId": 0, "zIndex": 0, }, }, }) response = client.get(f"/api/export/{project['id']}/results?scope=all&outputs=gt_label,pro_label") assert response.status_code == 200 with zipfile.ZipFile(BytesIO(response.content)) as archive: mapping = json.loads(archive.read("maskid_GT像素值_类别映射.json")) stem = "unclassified_0h00m00s000ms_frame000001" gt_label = cv2.imdecode( np.frombuffer(archive.read(f"GT_label图/{stem}.png"), dtype=np.uint8), cv2.IMREAD_UNCHANGED, ) pro_label = cv2.imdecode( np.frombuffer(archive.read(f"Pro_label彩色分割结果/{stem}.png"), dtype=np.uint8), cv2.IMREAD_COLOR, ) assert mapping["classes"] == [{ "gt_pixel_value": 0, "maskid": 0, "chineseName": "待分类", "className": "待分类", "categoryName": "", "rgb": [0, 0, 0], "color": "#000000", "key": "class:reserved-unclassified", "template_id": None, }] assert gt_label.dtype == np.uint8 assert gt_label[5, 5] == 0 assert pro_label[5, 5].tolist() == [0, 0, 0] def test_exported_gtlabel_round_trips_through_gt_mask_import_with_template_maskid(client, monkeypatch): monkeypatch.setattr("routers.export.download_file", lambda object_name: _fake_image_bytes(20, 20)) project = client.post("/api/projects", json={ "name": "GT Roundtrip Project", "video_path": "uploads/8/roundtrip.mp4", }).json() template = client.post("/api/templates", json={ "name": "Roundtrip Template", "color": "#06b6d4", "z_index": 0, "classes": [ {"id": "tumor", "name": "Tumor", "color": "#ff0000", "zIndex": 30, "maskId": 7}, ], "rules": [], }).json() source_frame = client.post(f"/api/projects/{project['id']}/frames", json={ "project_id": project["id"], "frame_index": 0, "image_url": "frames/source.jpg", "width": 20, "height": 20, "timestamp_ms": 0, }).json() target_frame = client.post(f"/api/projects/{project['id']}/frames", json={ "project_id": project["id"], "frame_index": 1, "image_url": "frames/target.jpg", "width": 20, "height": 20, "timestamp_ms": 1000, }).json() client.post("/api/ai/annotate", json={ "project_id": project["id"], "frame_id": source_frame["id"], "template_id": template["id"], "mask_data": { "polygons": [[[0.1, 0.1], [0.8, 0.1], [0.8, 0.8], [0.1, 0.8]]], "label": "Tumor", "color": "#ff0000", "class": {"id": "tumor", "name": "Tumor", "color": "#ff0000", "maskId": 7, "zIndex": 30}, }, }) export_response = client.get( f"/api/export/{project['id']}/results?scope=current&frame_id={source_frame['id']}&outputs=gt_label", ) assert export_response.status_code == 200 with zipfile.ZipFile(BytesIO(export_response.content)) as archive: stem = "roundtrip_0h00m00s000ms_frame000001" exported_gt_label = archive.read(f"GT_label图/{stem}.png") gt_label = cv2.imdecode(np.frombuffer(exported_gt_label, dtype=np.uint8), cv2.IMREAD_UNCHANGED) mapping = json.loads(archive.read("maskid_GT像素值_类别映射.json")) assert gt_label.dtype == np.uint8 assert gt_label[5, 5] == 7 assert mapping["classes"][0]["maskid"] == 7 import_response = client.post( "/api/ai/import-gt-mask", data={ "project_id": str(project["id"]), "frame_id": str(target_frame["id"]), "template_id": str(template["id"]), "unknown_color_policy": "discard", }, files={"file": ("exported_gt_label.png", exported_gt_label, "image/png")}, ) assert import_response.status_code == 201 imported = import_response.json() assert len(imported) == 1 assert imported[0]["frame_id"] == target_frame["id"] assert imported[0]["mask_data"]["gt_label_value"] == 7 assert imported[0]["mask_data"]["label"] == "Tumor" assert imported[0]["mask_data"]["class"]["maskId"] == 7 def test_export_results_rejects_gtlabel_maskid_outside_uint8_range(client, monkeypatch): monkeypatch.setattr("routers.export.download_file", lambda object_name: _fake_image_bytes(20, 20)) project = client.post("/api/projects", json={ "name": "Large MaskId Project", "video_path": "uploads/8/large-maskid.mp4", }).json() frame = client.post(f"/api/projects/{project['id']}/frames", json={ "project_id": project["id"], "frame_index": 0, "image_url": "frames/source.jpg", "width": 20, "height": 20, }).json() client.post("/api/ai/annotate", json={ "project_id": project["id"], "frame_id": frame["id"], "mask_data": { "polygons": [[[0.1, 0.1], [0.8, 0.1], [0.8, 0.8], [0.1, 0.8]]], "label": "TooLarge", "color": "#ff0000", "class": {"id": "too-large", "name": "TooLarge", "color": "#ff0000", "maskId": 300, "zIndex": 30}, }, }) response = client.get(f"/api/export/{project['id']}/results?scope=all&outputs=gt_label") assert response.status_code == 400 assert "8-bit maskid" in response.json()["detail"] def test_export_missing_project_returns_404(client): assert client.get("/api/export/999/coco").status_code == 404 assert client.get("/api/export/999/masks").status_code == 404 assert client.get("/api/export/999/results").status_code == 404