Files
Pre_Seg_Server/backend/tests/test_export.py
admin 481ffa5b67 完善项目导入、模板与分割工作区交互
- 增强 DICOM/视频项目导入与演示数据:DICOM 按文件名自然顺序处理,导入后展示上传与解析任务进度,恢复演示出厂设置保留演示视频和演示 DICOM 项目,并补充 demo media seed 逻辑。

- 完善项目管理:项目支持重命名、删除、复制,删除使用站内确认弹窗,复制支持新项目重置和全内容复制,DICOM 项目不显示生成帧入口。

- 完善 GT Mask 与导出链路:只支持 8-bit maskid 图导入,非法/全背景图明确拒绝,尺寸自动适配,高精度 polygon 回显;统一导出默认当前帧,GT_label 使用 uint8 和真实 maskid,待分类 maskid 0 与背景一致。

- 完善分割工作区交互:新增画笔和橡皮擦并支持尺寸控制,移除创建点/线段入口,工具栏按类别分隔,AI 智能分割使用明确 AI 图标,取消黄色 seed point,清空/删除传播 mask 后同步清理空帧时间轴状态。

- 完善传播与时间轴:自动传播使用 SAM 2.1 权重任务,参考帧无遮罩时提示,传播历史按同一蓝色系递进变暗,删除/清空传播链时保留人工或独立 AI 标注来源。

- 完善模板库:新增头颈部 CT 分割默认模板,所有模板保留 maskid 0 待分类,支持鼠标复制模板、拖拽层级、JSON 批量导入预览、删除 label 和站内删除确认。

- 完善用户与高风险确认:用户改密码、删除用户、恢复演示出厂设置和清空人工/AI 标注帧均改为站内确认交互,避免浏览器原生 prompt/confirm。

- 补充前后端测试与文档:更新项目、模板、GT 导入、导出、传播、DICOM、用户管理等测试,并同步 README、AGENTS 和 doc 下实现/契约/测试计划文档。
2026-05-03 17:11:59 +08:00

550 lines
22 KiB
Python

import zipfile
import json
from io import BytesIO
from urllib.parse import unquote
import cv2
import numpy as np
def _fake_image_bytes(width=100, height=50, color=(255, 255, 255)):
image = np.full((height, width, 3), color, dtype=np.uint8)
_, encoded = cv2.imencode(".jpg", image)
return encoded.tobytes()
def _seed_export_data(client):
project = client.post("/api/projects", json={
"name": "Export Project",
"video_path": "uploads/1/clip.mp4",
}).json()
frame = client.post(f"/api/projects/{project['id']}/frames", json={
"project_id": project["id"],
"frame_index": 0,
"image_url": "frames/0.jpg",
"width": 100,
"height": 50,
"timestamp_ms": 1250.0,
"source_frame_number": 37,
}).json()
template = client.post("/api/templates", json={
"name": "Category",
"color": "#06b6d4",
"z_index": 0,
"classes": [],
"rules": [],
}).json()
annotation = client.post("/api/ai/annotate", json={
"project_id": project["id"],
"frame_id": frame["id"],
"template_id": template["id"],
"mask_data": {"polygons": [[[0.1, 0.2], [0.9, 0.2], [0.9, 0.8], [0.1, 0.8]]]},
"points": [[0.5, 0.5]],
"bbox": [0.1, 0.2, 0.8, 0.6],
}).json()
return project, frame, template, annotation
def test_export_coco_json_structure(client):
project, frame, _, _ = _seed_export_data(client)
response = client.get(f"/api/export/{project['id']}/coco")
assert response.status_code == 200
assert response.headers["content-type"].startswith("application/json")
data = response.json()
assert data["info"]["description"] == "Annotations for Export Project"
assert data["images"][0] == {
"id": frame["id"],
"file_name": "frames/0.jpg",
"width": 100,
"height": 50,
"frame_index": 0,
}
assert data["annotations"][0]["segmentation"] == [[10.0, 10.0, 90.0, 10.0, 90.0, 40.0, 10.0, 40.0]]
assert data["annotations"][0]["bbox"] == [10.0, 10.0, 80.0, 30.000000000000004]
assert data["categories"][0]["name"] == "Category"
def test_export_masks_zip(client):
project, _, _, annotation = _seed_export_data(client)
response = client.get(f"/api/export/{project['id']}/masks")
assert response.status_code == 200
assert response.headers["content-type"].startswith("application/zip")
with zipfile.ZipFile(BytesIO(response.content)) as archive:
assert archive.namelist() == [
f"mask_{annotation['id']:06d}.png",
"semantic_frame_000000.png",
"semantic_classes.json",
]
def test_export_masks_uses_z_index_for_semantic_fusion(client):
project = client.post("/api/projects", json={"name": "Fusion Project"}).json()
frame = client.post(f"/api/projects/{project['id']}/frames", json={
"project_id": project["id"],
"frame_index": 0,
"image_url": "frames/0.jpg",
"width": 20,
"height": 20,
}).json()
low = client.post("/api/ai/annotate", json={
"project_id": project["id"],
"frame_id": frame["id"],
"mask_data": {
"polygons": [[[0.1, 0.1], [0.8, 0.1], [0.8, 0.8], [0.1, 0.8]]],
"label": "Low",
"color": "#00ff00",
"class": {"id": "low", "name": "Low", "color": "#00ff00", "zIndex": 10},
},
}).json()
high = client.post("/api/ai/annotate", json={
"project_id": project["id"],
"frame_id": frame["id"],
"mask_data": {
"polygons": [[[0.4, 0.4], [0.9, 0.4], [0.9, 0.9], [0.4, 0.9]]],
"label": "High",
"color": "#ff0000",
"class": {"id": "high", "name": "High", "color": "#ff0000", "zIndex": 20},
},
}).json()
response = client.get(f"/api/export/{project['id']}/masks")
assert response.status_code == 200
with zipfile.ZipFile(BytesIO(response.content)) as archive:
assert f"mask_{low['id']:06d}.png" in archive.namelist()
assert f"mask_{high['id']:06d}.png" in archive.namelist()
legend = json.loads(archive.read("semantic_classes.json"))
high_value = next(item["value"] for item in legend["classes"] if item["key"] == "class:high")
semantic_bytes = np.frombuffer(archive.read("semantic_frame_000000.png"), dtype=np.uint8)
semantic = cv2.imdecode(semantic_bytes, cv2.IMREAD_GRAYSCALE)
assert semantic[10, 10] == high_value
def test_export_results_zip_contains_coco_original_images_and_selected_mask_outputs(client, monkeypatch):
project, _, _, annotation = _seed_export_data(client)
monkeypatch.setattr("routers.export.download_file", lambda object_name: _fake_image_bytes())
response = client.get(f"/api/export/{project['id']}/results?scope=all&mask_type=both")
assert response.status_code == 200
assert response.headers["content-type"].startswith("application/zip")
with zipfile.ZipFile(BytesIO(response.content)) as archive:
names = archive.namelist()
frame_stem = "clip_0h00m01s250ms_frame000001"
assert "annotations_coco.json" in names
assert "maskid_GT像素值_类别映射.json" in names
assert f"原始图片/{frame_stem}.jpg" in names
assert f"分开Mask分割结果/{frame_stem}_分别导出/{frame_stem}_Category_maskid1.png" in names
assert f"GT_label图/{frame_stem}.png" in names
assert f"Pro_label彩色分割结果/{frame_stem}.png" in names
assert f"Mix_label重叠覆盖彩色分割结果/{frame_stem}.png" in names
coco = json.loads(archive.read("annotations_coco.json"))
mapping = json.loads(archive.read("maskid_GT像素值_类别映射.json"))
label_bytes = np.frombuffer(archive.read(f"GT_label图/{frame_stem}.png"), dtype=np.uint8)
gt_label = cv2.imdecode(label_bytes, cv2.IMREAD_UNCHANGED)
pro_label = cv2.imdecode(
np.frombuffer(archive.read(f"Pro_label彩色分割结果/{frame_stem}.png"), dtype=np.uint8),
cv2.IMREAD_COLOR,
)
mix_label = cv2.imdecode(
np.frombuffer(archive.read(f"Mix_label重叠覆盖彩色分割结果/{frame_stem}.png"), dtype=np.uint8),
cv2.IMREAD_COLOR,
)
assert coco["images"][0]["frame_index"] == 0
assert coco["annotations"][0]["image_id"] == annotation["frame_id"]
assert mapping["classes"] == [{
"gt_pixel_value": 1,
"maskid": 1,
"chineseName": "Category",
"className": "Category",
"categoryName": "Category",
"rgb": [6, 182, 212],
"color": "#06b6d4",
"key": f"template:{annotation['template_id']}",
"template_id": annotation["template_id"],
}]
assert gt_label.dtype == np.uint8
assert gt_label[0, 0] == 0
assert gt_label[20, 50] == 1
assert pro_label[20, 50].tolist() == [212, 182, 6]
assert pro_label[0, 0].tolist() == [0, 0, 0]
assert mix_label[20, 50].tolist() != [255, 255, 255]
def test_export_results_uses_internal_layer_order_for_gt_pro_and_mix_outputs(client, monkeypatch):
monkeypatch.setattr("routers.export.download_file", lambda object_name: _fake_image_bytes(20, 20))
project = client.post("/api/projects", json={
"name": "Layered Export Project",
"video_path": "uploads/2/layered.mp4",
}).json()
frame = client.post(f"/api/projects/{project['id']}/frames", json={
"project_id": project["id"],
"frame_index": 0,
"image_url": "frames/layered.jpg",
"width": 20,
"height": 20,
"timestamp_ms": 0,
"source_frame_number": 0,
}).json()
client.post("/api/ai/annotate", json={
"project_id": project["id"],
"frame_id": frame["id"],
"mask_data": {
"polygons": [[[0.1, 0.1], [0.8, 0.1], [0.8, 0.8], [0.1, 0.8]]],
"label": "Low",
"color": "#00ff00",
"class": {"id": "low", "name": "Low", "color": "#00ff00", "zIndex": 10},
},
})
client.post("/api/ai/annotate", json={
"project_id": project["id"],
"frame_id": frame["id"],
"mask_data": {
"polygons": [[[0.4, 0.4], [0.9, 0.4], [0.9, 0.9], [0.4, 0.9]]],
"label": "High",
"color": "#ff0000",
"class": {"id": "high", "name": "High", "color": "#ff0000", "zIndex": 20},
},
})
response = client.get(
f"/api/export/{project['id']}/results?scope=all&outputs=gt_label,pro_label,mix_label&mix_opacity=0.5",
)
assert response.status_code == 200
with zipfile.ZipFile(BytesIO(response.content)) as archive:
mapping = json.loads(archive.read("maskid_GT像素值_类别映射.json"))
high_value = next(item["maskid"] for item in mapping["classes"] if item["key"] == "class:high")
stem = "layered_0h00m00s000ms_frame000001"
gt_label = cv2.imdecode(
np.frombuffer(archive.read(f"GT_label图/{stem}.png"), dtype=np.uint8),
cv2.IMREAD_UNCHANGED,
)
pro_label = cv2.imdecode(
np.frombuffer(archive.read(f"Pro_label彩色分割结果/{stem}.png"), dtype=np.uint8),
cv2.IMREAD_COLOR,
)
mix_label = cv2.imdecode(
np.frombuffer(archive.read(f"Mix_label重叠覆盖彩色分割结果/{stem}.png"), dtype=np.uint8),
cv2.IMREAD_COLOR,
)
assert gt_label.dtype == np.uint8
assert gt_label[10, 10] == high_value
assert pro_label[10, 10].tolist() == [0, 0, 255]
assert mix_label[10, 10].tolist() == [127, 127, 255]
def test_export_results_supports_range_and_current_scope(client, monkeypatch):
monkeypatch.setattr("routers.export.download_file", lambda object_name: _fake_image_bytes(20, 20))
project = client.post("/api/projects", json={
"name": "Scoped Export Project",
"video_path": "uploads/9/scope.mp4",
"parse_fps": 2,
}).json()
template = client.post("/api/templates", json={
"name": "Scoped Category",
"color": "#06b6d4",
"z_index": 0,
"classes": [],
"rules": [],
}).json()
frames = []
annotations = []
for idx in range(3):
frame = client.post(f"/api/projects/{project['id']}/frames", json={
"project_id": project["id"],
"frame_index": idx,
"image_url": f"frames/{idx}.jpg",
"width": 20,
"height": 20,
"timestamp_ms": idx * 500.0,
"source_frame_number": idx * 10,
}).json()
frames.append(frame)
annotations.append(client.post("/api/ai/annotate", json={
"project_id": project["id"],
"frame_id": frame["id"],
"template_id": template["id"],
"mask_data": {"polygons": [[[0.1, 0.1], [0.8, 0.1], [0.8, 0.8], [0.1, 0.8]]]},
}).json())
range_response = client.get(
f"/api/export/{project['id']}/results?scope=range&start_frame=2&end_frame=3&mask_type=gt_label",
)
current_response = client.get(
f"/api/export/{project['id']}/results?scope=current&frame_id={frames[1]['id']}&mask_type=separate",
)
assert range_response.status_code == 200
assert "Scoped_Export_Project_seg_T_0h00m00s500ms-0h00m01s000ms_P_2-3.zip" in unquote(
range_response.headers["content-disposition"],
)
with zipfile.ZipFile(BytesIO(range_response.content)) as archive:
names = archive.namelist()
coco = json.loads(archive.read("annotations_coco.json"))
assert "原始图片/scope_0h00m00s500ms_frame000002.jpg" in names
assert "原始图片/scope_0h00m01s000ms_frame000003.jpg" in names
assert "原始图片/scope_0h00m00s000ms_frame000001.jpg" not in names
assert "GT_label图/scope_0h00m00s500ms_frame000002.png" in names
assert "GT_label图/scope_0h00m01s000ms_frame000003.png" in names
assert "GT_label图/scope_0h00m00s000ms_frame000001.png" not in names
assert not any(name.startswith("分开Mask分割结果/") for name in names)
assert not any(name.startswith("Pro_label彩色分割结果/") for name in names)
assert not any(name.startswith("Mix_label重叠覆盖彩色分割结果/") for name in names)
assert [image["frame_index"] for image in coco["images"]] == [1, 2]
assert current_response.status_code == 200
with zipfile.ZipFile(BytesIO(current_response.content)) as archive:
names = archive.namelist()
coco = json.loads(archive.read("annotations_coco.json"))
current_stem = "scope_0h00m00s500ms_frame000002"
assert f"原始图片/{current_stem}.jpg" in names
assert f"分开Mask分割结果/{current_stem}_分别导出/{current_stem}_Scoped_Category_maskid1.png" in names
assert f"分开Mask分割结果/scope_0h00m00s000ms_frame000001_分别导出/scope_0h00m00s000ms_frame000001_Scoped_Category_maskid1.png" not in names
assert not any(name.startswith("GT_label图/") for name in names)
assert not any(name.startswith("Pro_label彩色分割结果/") for name in names)
assert not any(name.startswith("Mix_label重叠覆盖彩色分割结果/") for name in names)
assert [image["id"] for image in coco["images"]] == [frames[1]["id"]]
def test_export_results_preserves_template_maskid_consistently_across_frames(client, monkeypatch):
monkeypatch.setattr("routers.export.download_file", lambda object_name: _fake_image_bytes(20, 20))
project = client.post("/api/projects", json={
"name": "MaskId Export Project",
"video_path": "uploads/8/maskid-demo.mp4",
"parse_fps": 1,
}).json()
frames = []
for idx in range(2):
frames.append(client.post(f"/api/projects/{project['id']}/frames", json={
"project_id": project["id"],
"frame_index": idx,
"image_url": f"frames/{idx}.jpg",
"width": 20,
"height": 20,
"timestamp_ms": idx * 1000.0,
"source_frame_number": idx,
}).json())
client.post("/api/ai/annotate", json={
"project_id": project["id"],
"frame_id": frames[-1]["id"],
"mask_data": {
"polygons": [[[0.1, 0.1], [0.8, 0.1], [0.8, 0.8], [0.1, 0.8]]],
"label": "Tumor",
"color": "#ff0000",
"class": {"id": "tumor", "name": "Tumor", "color": "#ff0000", "maskId": 7, "zIndex": 30},
},
})
response = client.get(f"/api/export/{project['id']}/results?scope=all&mask_type=both")
assert response.status_code == 200
with zipfile.ZipFile(BytesIO(response.content)) as archive:
names = archive.namelist()
mapping = json.loads(archive.read("maskid_GT像素值_类别映射.json"))
first_stem = "maskid-demo_0h00m00s000ms_frame000001"
second_stem = "maskid-demo_0h00m01s000ms_frame000002"
assert f"分开Mask分割结果/{first_stem}_分别导出/{first_stem}_Tumor_maskid7.png" in names
assert f"分开Mask分割结果/{second_stem}_分别导出/{second_stem}_Tumor_maskid7.png" in names
first_label = cv2.imdecode(np.frombuffer(archive.read(f"GT_label图/{first_stem}.png"), dtype=np.uint8), cv2.IMREAD_UNCHANGED)
second_label = cv2.imdecode(np.frombuffer(archive.read(f"GT_label图/{second_stem}.png"), dtype=np.uint8), cv2.IMREAD_UNCHANGED)
assert mapping["classes"] == [{
"gt_pixel_value": 7,
"maskid": 7,
"chineseName": "Tumor",
"className": "Tumor",
"categoryName": "",
"rgb": [255, 0, 0],
"color": "#ff0000",
"key": "class:tumor",
"template_id": None,
}]
assert first_label.dtype == np.uint8
assert second_label.dtype == np.uint8
assert first_label[5, 5] == 7
assert second_label[5, 5] == 7
def test_export_results_keeps_unclassified_maskid_zero_black_in_gt_and_pro(client, monkeypatch):
monkeypatch.setattr("routers.export.download_file", lambda object_name: _fake_image_bytes(20, 20))
project = client.post("/api/projects", json={
"name": "Unclassified Export Project",
"video_path": "uploads/8/unclassified.mp4",
}).json()
frame = client.post(f"/api/projects/{project['id']}/frames", json={
"project_id": project["id"],
"frame_index": 0,
"image_url": "frames/source.jpg",
"width": 20,
"height": 20,
"timestamp_ms": 0,
}).json()
client.post("/api/ai/annotate", json={
"project_id": project["id"],
"frame_id": frame["id"],
"mask_data": {
"polygons": [[[0.1, 0.1], [0.8, 0.1], [0.8, 0.8], [0.1, 0.8]]],
"label": "待分类",
"color": "#000000",
"class": {
"id": "reserved-unclassified",
"name": "待分类",
"color": "#000000",
"maskId": 0,
"zIndex": 0,
},
},
})
response = client.get(f"/api/export/{project['id']}/results?scope=all&outputs=gt_label,pro_label")
assert response.status_code == 200
with zipfile.ZipFile(BytesIO(response.content)) as archive:
mapping = json.loads(archive.read("maskid_GT像素值_类别映射.json"))
stem = "unclassified_0h00m00s000ms_frame000001"
gt_label = cv2.imdecode(
np.frombuffer(archive.read(f"GT_label图/{stem}.png"), dtype=np.uint8),
cv2.IMREAD_UNCHANGED,
)
pro_label = cv2.imdecode(
np.frombuffer(archive.read(f"Pro_label彩色分割结果/{stem}.png"), dtype=np.uint8),
cv2.IMREAD_COLOR,
)
assert mapping["classes"] == [{
"gt_pixel_value": 0,
"maskid": 0,
"chineseName": "待分类",
"className": "待分类",
"categoryName": "",
"rgb": [0, 0, 0],
"color": "#000000",
"key": "class:reserved-unclassified",
"template_id": None,
}]
assert gt_label.dtype == np.uint8
assert gt_label[5, 5] == 0
assert pro_label[5, 5].tolist() == [0, 0, 0]
def test_exported_gtlabel_round_trips_through_gt_mask_import_with_template_maskid(client, monkeypatch):
monkeypatch.setattr("routers.export.download_file", lambda object_name: _fake_image_bytes(20, 20))
project = client.post("/api/projects", json={
"name": "GT Roundtrip Project",
"video_path": "uploads/8/roundtrip.mp4",
}).json()
template = client.post("/api/templates", json={
"name": "Roundtrip Template",
"color": "#06b6d4",
"z_index": 0,
"classes": [
{"id": "tumor", "name": "Tumor", "color": "#ff0000", "zIndex": 30, "maskId": 7},
],
"rules": [],
}).json()
source_frame = client.post(f"/api/projects/{project['id']}/frames", json={
"project_id": project["id"],
"frame_index": 0,
"image_url": "frames/source.jpg",
"width": 20,
"height": 20,
"timestamp_ms": 0,
}).json()
target_frame = client.post(f"/api/projects/{project['id']}/frames", json={
"project_id": project["id"],
"frame_index": 1,
"image_url": "frames/target.jpg",
"width": 20,
"height": 20,
"timestamp_ms": 1000,
}).json()
client.post("/api/ai/annotate", json={
"project_id": project["id"],
"frame_id": source_frame["id"],
"template_id": template["id"],
"mask_data": {
"polygons": [[[0.1, 0.1], [0.8, 0.1], [0.8, 0.8], [0.1, 0.8]]],
"label": "Tumor",
"color": "#ff0000",
"class": {"id": "tumor", "name": "Tumor", "color": "#ff0000", "maskId": 7, "zIndex": 30},
},
})
export_response = client.get(
f"/api/export/{project['id']}/results?scope=current&frame_id={source_frame['id']}&outputs=gt_label",
)
assert export_response.status_code == 200
with zipfile.ZipFile(BytesIO(export_response.content)) as archive:
stem = "roundtrip_0h00m00s000ms_frame000001"
exported_gt_label = archive.read(f"GT_label图/{stem}.png")
gt_label = cv2.imdecode(np.frombuffer(exported_gt_label, dtype=np.uint8), cv2.IMREAD_UNCHANGED)
mapping = json.loads(archive.read("maskid_GT像素值_类别映射.json"))
assert gt_label.dtype == np.uint8
assert gt_label[5, 5] == 7
assert mapping["classes"][0]["maskid"] == 7
import_response = client.post(
"/api/ai/import-gt-mask",
data={
"project_id": str(project["id"]),
"frame_id": str(target_frame["id"]),
"template_id": str(template["id"]),
"unknown_color_policy": "discard",
},
files={"file": ("exported_gt_label.png", exported_gt_label, "image/png")},
)
assert import_response.status_code == 201
imported = import_response.json()
assert len(imported) == 1
assert imported[0]["frame_id"] == target_frame["id"]
assert imported[0]["mask_data"]["gt_label_value"] == 7
assert imported[0]["mask_data"]["label"] == "Tumor"
assert imported[0]["mask_data"]["class"]["maskId"] == 7
def test_export_results_rejects_gtlabel_maskid_outside_uint8_range(client, monkeypatch):
monkeypatch.setattr("routers.export.download_file", lambda object_name: _fake_image_bytes(20, 20))
project = client.post("/api/projects", json={
"name": "Large MaskId Project",
"video_path": "uploads/8/large-maskid.mp4",
}).json()
frame = client.post(f"/api/projects/{project['id']}/frames", json={
"project_id": project["id"],
"frame_index": 0,
"image_url": "frames/source.jpg",
"width": 20,
"height": 20,
}).json()
client.post("/api/ai/annotate", json={
"project_id": project["id"],
"frame_id": frame["id"],
"mask_data": {
"polygons": [[[0.1, 0.1], [0.8, 0.1], [0.8, 0.8], [0.1, 0.8]]],
"label": "TooLarge",
"color": "#ff0000",
"class": {"id": "too-large", "name": "TooLarge", "color": "#ff0000", "maskId": 300, "zIndex": 30},
},
})
response = client.get(f"/api/export/{project['id']}/results?scope=all&outputs=gt_label")
assert response.status_code == 400
assert "8-bit maskid" in response.json()["detail"]
def test_export_missing_project_returns_404(client):
assert client.get("/api/export/999/coco").status_code == 404
assert client.get("/api/export/999/masks").status_code == 404
assert client.get("/api/export/999/results").status_code == 404