-
+
- 覆盖率
- 骨架长度
- 连通域
ISISeg
+介入导丝视频分割工作台
+Result
+commit debd0bfd50fad75885b97ad63c87a5c08aae81df Author: admin <572701190@qq.com> Date: Mon May 18 17:49:30 2026 +0800 2026-05-18-17-40-02 构建导丝分割Web系统 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a343022 --- /dev/null +++ b/.gitignore @@ -0,0 +1,10 @@ +__pycache__/ +*.py[cod] +.pytest_cache/ +.mypy_cache/ +.ruff_cache/ +.venv/ +venv/ +storage/jobs/ +storage/uploads/ +*.log diff --git a/2026_5_18_介入手术导丝分割方案.docx b/2026_5_18_介入手术导丝分割方案.docx new file mode 100644 index 0000000..b824402 Binary files /dev/null and b/2026_5_18_介入手术导丝分割方案.docx differ diff --git a/README.md b/README.md new file mode 100644 index 0000000..2e1db88 --- /dev/null +++ b/README.md @@ -0,0 +1,38 @@ +# ISISeg + +介入导丝视频分割 Web 系统第一版。系统支持上传 X 射线透视图片或视频,并使用多种无需训练数据的细线分割方法生成导丝掩膜、叠加图和结果视频。 + +## 功能 + +- Web 上传图片或视频。 +- 支持 `hessian_ridge`、`edge_morphology`、`temporal_difference`、`fusion`、`compare` 五种模式。 +- 显示原图、导丝叠加结果、掩膜、覆盖率、骨架长度和连通域数量。 +- 提供合成导丝视频生成脚本,便于快速验证。 + +## 启动 + +```bash +python3 -m pip install -r requirements.txt +bash scripts/generate_sample.sh +bash scripts/run_dev.sh +``` + +访问: + +```text +http://127.0.0.1:8000 +``` + +## 测试 + +```bash +pytest -q +``` + +## 目录 + +- `backend/`:FastAPI 服务与分割算法。 +- `frontend/`:Web 操作台。 +- `scripts/`:启动与样例生成脚本。 +- `tests/`:自动化测试。 +- `工程分析/`:需求、实现、测试与经验记录。 diff --git a/backend/__init__.py b/backend/__init__.py new file mode 100644 index 0000000..3fa4f7d --- /dev/null +++ b/backend/__init__.py @@ -0,0 +1 @@ +"""ISISeg backend package.""" diff --git a/backend/main.py b/backend/main.py new file mode 100644 index 0000000..7230905 --- /dev/null +++ b/backend/main.py @@ -0,0 +1,210 @@ +from __future__ import annotations + +import shutil +import uuid +from pathlib import Path +from typing import Any + +import cv2 +from fastapi import FastAPI, File, Form, HTTPException, UploadFile +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import FileResponse +from fastapi.staticfiles import StaticFiles + +from backend.segmentation import METHOD_DESCRIPTIONS, compare_frame, segment_frame + + +ROOT = Path(__file__).resolve().parents[1] +FRONTEND_DIR = ROOT / "frontend" +STORAGE_DIR = ROOT / "storage" +UPLOAD_DIR = STORAGE_DIR / "uploads" +JOB_DIR = STORAGE_DIR / "jobs" +SAMPLE_DIR = STORAGE_DIR / "samples" + +IMAGE_SUFFIXES = {".png", ".jpg", ".jpeg", ".bmp", ".tif", ".tiff"} +VIDEO_SUFFIXES = {".mp4", ".avi", ".mov", ".mkv", ".webm"} + +app = FastAPI(title="ISISeg Guidewire Segmentation", version="0.1.0") +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_methods=["*"], + allow_headers=["*"], +) + + +def ensure_dirs() -> None: + for directory in (UPLOAD_DIR, JOB_DIR, SAMPLE_DIR): + directory.mkdir(parents=True, exist_ok=True) + + +ensure_dirs() +app.mount("/storage", StaticFiles(directory=STORAGE_DIR), name="storage") + + +@app.get("/api/health") +def health() -> dict[str, str]: + return {"status": "ok", "service": "ISISeg", "version": app.version} + + +@app.get("/api/methods") +def methods() -> dict[str, Any]: + return {"methods": METHOD_DESCRIPTIONS} + + +def _public(path: Path) -> str: + return "/" + path.relative_to(ROOT).as_posix() + + +def _save_upload(file: UploadFile, job_path: Path) -> Path: + suffix = Path(file.filename or "").suffix.lower() + if suffix not in IMAGE_SUFFIXES | VIDEO_SUFFIXES: + raise HTTPException(status_code=400, detail="仅支持图片或视频文件") + destination = job_path / f"upload{suffix}" + with destination.open("wb") as buffer: + shutil.copyfileobj(file.file, buffer) + return destination + + +def _save_frame_outputs( + job_path: Path, + frame_index: int, + method: str, + frame, + output, +) -> dict[str, Any]: + method_path = job_path / method + method_path.mkdir(exist_ok=True) + original_path = method_path / f"frame_{frame_index:04d}_original.png" + mask_path = method_path / f"frame_{frame_index:04d}_mask.png" + overlay_path = method_path / f"frame_{frame_index:04d}_overlay.png" + cv2.imwrite(str(original_path), frame) + cv2.imwrite(str(mask_path), output.mask) + cv2.imwrite(str(overlay_path), output.overlay) + return { + "frame_index": frame_index, + "method": method, + "original_url": _public(original_path), + "mask_url": _public(mask_path), + "overlay_url": _public(overlay_path), + "metrics": output.metrics, + } + + +def _process_image(job_path: Path, source: Path, method: str, sensitivity: float) -> dict[str, Any]: + frame = cv2.imread(str(source), cv2.IMREAD_COLOR) + if frame is None: + raise HTTPException(status_code=400, detail="无法读取图片") + if method == "compare": + results = [ + _save_frame_outputs(job_path, 0, item.method, frame, item) + for item in compare_frame(frame, None, sensitivity) + ] + else: + output = segment_frame(frame, method, None, sensitivity) + results = [_save_frame_outputs(job_path, 0, method, frame, output)] + return {"kind": "image", "frames": results, "video_url": None} + + +def _selected_frame(index: int, stride: int, selected_count: int, max_frames: int) -> bool: + return index % max(1, stride) == 0 and selected_count < max_frames + + +def _process_video( + job_path: Path, + source: Path, + method: str, + sensitivity: float, + frame_stride: int, + max_frames: int, +) -> dict[str, Any]: + capture = cv2.VideoCapture(str(source)) + if not capture.isOpened(): + raise HTTPException(status_code=400, detail="无法读取视频") + + frames: list[dict[str, Any]] = [] + previous = None + frame_index = 0 + selected_count = 0 + writer = None + video_path = job_path / f"{method}_overlay.mp4" + + try: + while True: + ok, frame = capture.read() + if not ok: + break + if not _selected_frame(frame_index, frame_stride, selected_count, max_frames): + previous = frame + frame_index += 1 + continue + + if method == "compare": + outputs = compare_frame(frame, previous, sensitivity) + for output in outputs: + frames.append(_save_frame_outputs(job_path, frame_index, output.method, frame, output)) + video_output = next(item for item in outputs if item.method == "fusion") + else: + video_output = segment_frame(frame, method, previous, sensitivity) + frames.append(_save_frame_outputs(job_path, frame_index, method, frame, video_output)) + + if writer is None: + height, width = video_output.overlay.shape[:2] + fourcc = cv2.VideoWriter_fourcc(*"mp4v") + writer = cv2.VideoWriter(str(video_path), fourcc, 8.0, (width, height)) + writer.write(video_output.overlay) + + selected_count += 1 + previous = frame + frame_index += 1 + finally: + capture.release() + if writer is not None: + writer.release() + + if not frames: + raise HTTPException(status_code=400, detail="视频没有可处理帧") + return {"kind": "video", "frames": frames, "video_url": _public(video_path) if video_path.exists() else None} + + +@app.post("/api/segment") +def segment( + file: UploadFile = File(...), + method: str = Form("fusion"), + sensitivity: float = Form(0.56), + frame_stride: int = Form(5), + max_frames: int = Form(12), +) -> dict[str, Any]: + ensure_dirs() + if method not in METHOD_DESCRIPTIONS: + raise HTTPException(status_code=400, detail="未知分割方法") + sensitivity = max(0.05, min(float(sensitivity), 0.95)) + frame_stride = max(1, min(int(frame_stride), 90)) + max_frames = max(1, min(int(max_frames), 80)) + job_id = uuid.uuid4().hex[:12] + job_path = JOB_DIR / job_id + job_path.mkdir(parents=True, exist_ok=True) + source = _save_upload(file, job_path) + suffix = source.suffix.lower() + if suffix in IMAGE_SUFFIXES: + result = _process_image(job_path, source, method, sensitivity) + elif suffix in VIDEO_SUFFIXES: + result = _process_video(job_path, source, method, sensitivity, frame_stride, max_frames) + else: + raise HTTPException(status_code=400, detail="不支持的文件类型") + return { + "job_id": job_id, + "method": method, + "sensitivity": sensitivity, + "frame_stride": frame_stride, + "max_frames": max_frames, + **result, + } + + +@app.get("/") +def index() -> FileResponse: + return FileResponse(FRONTEND_DIR / "index.html") + + +app.mount("/", StaticFiles(directory=FRONTEND_DIR), name="frontend") diff --git a/backend/segmentation.py b/backend/segmentation.py new file mode 100644 index 0000000..60f196f --- /dev/null +++ b/backend/segmentation.py @@ -0,0 +1,232 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Callable + +import cv2 +import numpy as np +from skimage.filters import frangi, threshold_otsu +from skimage.morphology import remove_small_objects, skeletonize + + +METHOD_DESCRIPTIONS = { + "hessian_ridge": { + "label": "Hessian / Frangi 细线增强", + "description": "多尺度 Hessian 管状结构响应,适合低对比细导丝候选提取。", + "uses_temporal": False, + }, + "edge_morphology": { + "label": "边缘 + 形态学", + "description": "CLAHE、黑帽增强、Canny 边缘与线性形态学连接。", + "uses_temporal": False, + }, + "temporal_difference": { + "label": "视频时序差分", + "description": "利用相邻帧运动候选抑制静态骨骼和背景结构。", + "uses_temporal": True, + }, + "fusion": { + "label": "融合模式", + "description": "融合 Hessian、边缘形态学和时序差分,作为默认稳健输出。", + "uses_temporal": True, + }, + "compare": { + "label": "多方法对比", + "description": "对同一帧同时运行多种方法,便于调参和方案比较。", + "uses_temporal": True, + }, +} + + +@dataclass(frozen=True) +class SegmentationOutput: + method: str + mask: np.ndarray + overlay: np.ndarray + metrics: dict[str, float | int] + + +def normalize01(image: np.ndarray) -> np.ndarray: + image = image.astype(np.float32) + low = float(np.percentile(image, 1)) + high = float(np.percentile(image, 99)) + if high <= low: + return np.zeros_like(image, dtype=np.float32) + return np.clip((image - low) / (high - low), 0.0, 1.0) + + +def to_gray(frame: np.ndarray) -> np.ndarray: + if frame.ndim == 2: + return frame + return cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) + + +def clahe_gray(frame: np.ndarray) -> np.ndarray: + gray = to_gray(frame) + clahe = cv2.createCLAHE(clipLimit=2.2, tileGridSize=(8, 8)) + return clahe.apply(gray) + + +def _adaptive_cutoff(response: np.ndarray, sensitivity: float) -> float: + response = normalize01(response) + nonzero = response[response > 0] + if nonzero.size < 16: + return 1.0 + sensitivity = float(np.clip(sensitivity, 0.05, 0.95)) + percentile = 99.2 - sensitivity * 22.0 + percentile_cut = float(np.percentile(nonzero, percentile)) + try: + otsu_cut = float(threshold_otsu(nonzero)) + except ValueError: + otsu_cut = percentile_cut + return max(min(percentile_cut, 0.98), otsu_cut * 0.72) + + +def clean_mask(mask: np.ndarray, min_area: int = 12) -> np.ndarray: + binary = mask.astype(bool) + binary = remove_small_objects(binary, max_size=max(1, int(min_area))) + cleaned = binary.astype(np.uint8) * 255 + kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3)) + cleaned = cv2.morphologyEx(cleaned, cv2.MORPH_CLOSE, kernel, iterations=1) + return cleaned + + +def hessian_ridge_mask(frame: np.ndarray, sensitivity: float = 0.56) -> np.ndarray: + enhanced = clahe_gray(frame) + inverted = 255 - enhanced + normalized = normalize01(inverted) + response = frangi( + normalized, + sigmas=(0.7, 1.1, 1.7, 2.3), + alpha=0.55, + beta=0.55, + gamma=12, + black_ridges=False, + ) + response = normalize01(response) + cutoff = _adaptive_cutoff(response, sensitivity) + mask = response >= cutoff + return clean_mask(mask, min_area=10) + + +def edge_morphology_mask(frame: np.ndarray, sensitivity: float = 0.56) -> np.ndarray: + enhanced = clahe_gray(frame) + blur = cv2.GaussianBlur(enhanced, (3, 3), 0) + kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (15, 15)) + blackhat = cv2.morphologyEx(blur, cv2.MORPH_BLACKHAT, kernel) + dark_line = normalize01(blackhat) + cutoff = _adaptive_cutoff(dark_line, min(0.95, sensitivity + 0.1)) + candidate = (dark_line >= cutoff).astype(np.uint8) * 255 + low = int(20 + (1.0 - sensitivity) * 65) + high = int(70 + (1.0 - sensitivity) * 120) + edges = cv2.Canny(blur, low, high) + candidate = cv2.dilate(candidate, cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3)), iterations=1) + edges = cv2.bitwise_and(edges, candidate) + line_h = cv2.getStructuringElement(cv2.MORPH_RECT, (5, 1)) + line_v = cv2.getStructuringElement(cv2.MORPH_RECT, (1, 5)) + connected = cv2.morphologyEx(edges, cv2.MORPH_CLOSE, line_h, iterations=1) + connected = cv2.morphologyEx(connected, cv2.MORPH_CLOSE, line_v, iterations=1) + connected = cv2.bitwise_or(connected, cv2.erode(candidate, cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2, 2)))) + return clean_mask(connected, min_area=8) + + +def temporal_difference_mask( + frame: np.ndarray, + previous_frame: np.ndarray | None, + sensitivity: float = 0.56, +) -> np.ndarray: + ridge = hessian_ridge_mask(frame, sensitivity=sensitivity) + if previous_frame is None: + return ridge + current = cv2.GaussianBlur(clahe_gray(frame), (5, 5), 0) + previous = cv2.GaussianBlur(clahe_gray(previous_frame), (5, 5), 0) + diff = cv2.absdiff(current, previous) + diff = normalize01(diff) + cutoff = _adaptive_cutoff(diff, min(0.92, sensitivity + 0.12)) + moving = (diff >= cutoff).astype(np.uint8) * 255 + moving = cv2.dilate(moving, cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3)), iterations=1) + blended = cv2.bitwise_or(cv2.bitwise_and(ridge, moving), cv2.bitwise_and(ridge, cv2.dilate(moving, None))) + if int(np.count_nonzero(blended)) < 8: + blended = cv2.bitwise_or(ridge, moving) + return clean_mask(blended, min_area=8) + + +def fusion_mask( + frame: np.ndarray, + previous_frame: np.ndarray | None = None, + sensitivity: float = 0.56, +) -> np.ndarray: + ridge = hessian_ridge_mask(frame, sensitivity=sensitivity) + edge = edge_morphology_mask(frame, sensitivity=sensitivity) + temporal = temporal_difference_mask(frame, previous_frame, sensitivity=sensitivity) + votes = ( + (ridge > 0).astype(np.uint8) + + (edge > 0).astype(np.uint8) + + (temporal > 0).astype(np.uint8) + ) + fused = votes >= 2 + if int(np.count_nonzero(fused)) < 8: + fused = votes >= 1 + return clean_mask(fused, min_area=10) + + +def overlay_mask(frame: np.ndarray, mask: np.ndarray, color: tuple[int, int, int] = (0, 220, 255)) -> np.ndarray: + if frame.ndim == 2: + base = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR) + else: + base = frame.copy() + color_layer = np.zeros_like(base) + color_layer[mask > 0] = color + return cv2.addWeighted(base, 0.78, color_layer, 0.72, 0) + + +def mask_metrics(mask: np.ndarray) -> dict[str, float | int]: + binary = mask > 0 + coverage = float(np.count_nonzero(binary) / binary.size) if binary.size else 0.0 + skeleton = skeletonize(binary) + component_count, _ = cv2.connectedComponents(binary.astype(np.uint8)) + return { + "coverage": round(coverage, 6), + "mask_pixels": int(np.count_nonzero(binary)), + "skeleton_length": int(np.count_nonzero(skeleton)), + "components": max(0, int(component_count) - 1), + } + + +def segment_frame( + frame: np.ndarray, + method: str = "fusion", + previous_frame: np.ndarray | None = None, + sensitivity: float = 0.56, +) -> SegmentationOutput: + method_map: dict[str, Callable[..., np.ndarray]] = { + "hessian_ridge": hessian_ridge_mask, + "edge_morphology": edge_morphology_mask, + "temporal_difference": temporal_difference_mask, + "fusion": fusion_mask, + } + if method not in method_map: + raise ValueError(f"Unknown segmentation method: {method}") + if method in {"temporal_difference", "fusion"}: + mask = method_map[method](frame, previous_frame, sensitivity) + else: + mask = method_map[method](frame, sensitivity) + return SegmentationOutput( + method=method, + mask=mask, + overlay=overlay_mask(frame, mask), + metrics=mask_metrics(mask), + ) + + +def compare_frame( + frame: np.ndarray, + previous_frame: np.ndarray | None = None, + sensitivity: float = 0.56, +) -> list[SegmentationOutput]: + return [ + segment_frame(frame, "hessian_ridge", previous_frame, sensitivity), + segment_frame(frame, "edge_morphology", previous_frame, sensitivity), + segment_frame(frame, "temporal_difference", previous_frame, sensitivity), + segment_frame(frame, "fusion", previous_frame, sensitivity), + ] diff --git a/frontend/app.js b/frontend/app.js new file mode 100644 index 0000000..5ca09ee --- /dev/null +++ b/frontend/app.js @@ -0,0 +1,107 @@ +const form = document.querySelector("#segmentForm"); +const methodSelect = document.querySelector("#method"); +const sensitivity = document.querySelector("#sensitivity"); +const sensitivityValue = document.querySelector("#sensitivityValue"); +const fileInput = document.querySelector("#file"); +const fileName = document.querySelector("#fileName"); +const resultGrid = document.querySelector("#resultGrid"); +const emptyState = document.querySelector("#emptyState"); +const videoLink = document.querySelector("#videoLink"); +const health = document.querySelector("#health"); +const template = document.querySelector("#resultCardTemplate"); + +const methodLabels = new Map(); + +function setBusy(isBusy) { + const button = form.querySelector("button"); + button.disabled = isBusy; + button.querySelector("span").textContent = isBusy ? "分割中" : "开始分割"; +} + +async function loadHealth() { + try { + const response = await fetch("/api/health"); + if (!response.ok) throw new Error("bad health"); + const data = await response.json(); + health.textContent = `${data.service} ${data.version}`; + health.classList.add("ok"); + } catch { + health.textContent = "服务不可用"; + health.classList.add("bad"); + } +} + +async function loadMethods() { + const response = await fetch("/api/methods"); + const data = await response.json(); + methodSelect.innerHTML = ""; + Object.entries(data.methods).forEach(([key, value]) => { + methodLabels.set(key, value.label); + const option = document.createElement("option"); + option.value = key; + option.textContent = value.label; + if (key === "fusion") option.selected = true; + methodSelect.appendChild(option); + }); +} + +function renderResults(data) { + resultGrid.innerHTML = ""; + emptyState.hidden = true; + videoLink.hidden = !data.video_url; + if (data.video_url) { + videoLink.href = data.video_url; + videoLink.setAttribute("download", ""); + } + + data.frames.forEach((frame) => { + const node = template.content.firstElementChild.cloneNode(true); + node.querySelector(".method").textContent = methodLabels.get(frame.method) || frame.method; + node.querySelector(".frame-index").textContent = `帧 ${frame.frame_index}`; + node.querySelector(".overlay").src = frame.overlay_url; + node.querySelector(".mask").src = frame.mask_url; + node.querySelector(".coverage").textContent = `${(frame.metrics.coverage * 100).toFixed(3)}%`; + node.querySelector(".skeleton").textContent = frame.metrics.skeleton_length; + node.querySelector(".components").textContent = frame.metrics.components; + resultGrid.appendChild(node); + }); +} + +sensitivity.addEventListener("input", () => { + sensitivityValue.textContent = Number(sensitivity.value).toFixed(2); +}); + +fileInput.addEventListener("change", () => { + const file = fileInput.files[0]; + fileName.textContent = file ? file.name : "支持 mp4、avi、png、jpg、tiff"; +}); + +form.addEventListener("submit", async (event) => { + event.preventDefault(); + setBusy(true); + emptyState.hidden = false; + emptyState.textContent = "正在抽帧和分割,请稍候。"; + resultGrid.innerHTML = ""; + videoLink.hidden = true; + + try { + const payload = new FormData(form); + const response = await fetch("/api/segment", { + method: "POST", + body: payload, + }); + const data = await response.json(); + if (!response.ok) { + throw new Error(data.detail || "分割失败"); + } + renderResults(data); + } catch (error) { + emptyState.hidden = false; + emptyState.textContent = error.message; + } finally { + setBusy(false); + } +}); + +loadHealth(); +loadMethods(); diff --git a/frontend/index.html b/frontend/index.html new file mode 100644 index 0000000..3a593f8 --- /dev/null +++ b/frontend/index.html @@ -0,0 +1,96 @@ + + +
+ + +ISISeg
+Result
+